Spring Integration: основы интеграционного тестирования MQTT

Я пытаюсь протестировать простой прослушиватель MQTT, созданный с помощью Spring Boot и Spring Integration, но он не работает. Я пробовал много подходов. Наиболее перспективным было:

@RunWith(SpringRunner.class)
@SpringBootTest
public class BasicMqttTest {

@Value("${mqtt.client.id}")
private String mqttClientId;

@Value("${mqtt.state.topic}")
private String mqttTopic;

@Autowired
MqttPahoClientFactory mqttPahoClientFactory;

protected IMqttClient client;

@Before
public void setUp() throws Exception {
    client = mqttPahoClientFactory.getClientInstance(mqttPahoClientFactory.getConnectionOptions().getServerURIs()[0], mqttClientId);
    client.connect();
}

@After
public void tearDown() throws Exception {
    client.disconnect();
    client.close();
}

@Test
public void contextLoads() throws Exception {
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setPayload("MQTT!".getBytes());
    client.publish(mqttTopic, mqttMessage);
}
}

Однако тест выполняется с 2018-07-12 16:53:50.937 ERROR 21160 --- [T Rec: consumer] .m.i.MqttPahoMessageDrivenChannelAdapter : Lost connection: Verbindung wurde getrennt; retrying... , и я не вижу ничего распечатанного. Код в основном взят из: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/mqtt . Пример работает хорошо, но мне нужно уметь писать правильные интеграционные тесты.

Конфигурация:

@Value("${mqtt.server.uri}")
private String mqttServerUri;

@Value("${mqtt.username}")
private String mqttUsername;

@Value("${mqtt.password}")
private String mqttPassword;

@Value("${mqtt.client.id}")
private String mqttClientId;

@Value("${mqtt.state.topic}")
private String mqttTopic;

@Value("${mqtt.completion.timeout}")
private Integer mqttCompletionTimeout;

@Value("${mqtt.quality.of.service}")
private Integer mqttQualityOfService;

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{mqttServerUri});
    options.setUserName(mqttUsername);
    options.setPassword(mqttPassword.toCharArray());
    factory.setConnectionOptions(options);
    return factory;
}

@Bean
@Qualifier("MqttInboundChannel")
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            mqttClientId,
            mqttClientFactory(),
            mqttTopic
    );
    adapter.setCompletionTimeout(mqttCompletionTimeout);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(mqttQualityOfService);
    return adapter;
}

@Bean
public IntegrationFlow myMqttInFlow() {
    return IntegrationFlows.from(mqttInbound)
            .handle(message -> {
                System.out.println(message);
            }).get();
}

ОБНОВЛЕНИЕ:

Тоже не сработало:

@Autowired
protected MessageHandler mqttOutbound;

@Test
public void test0() throws Exception {
    mqttOutbound.handleMessage(MessageBuilder.withPayload("test").build());
}


@SpringBootApplication
static class MqttSourceApplication {

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Bean
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("test", mqttClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("test");
        messageHandler.setConverter(pahoMessageConverter());
        return messageHandler;
    }

    @Bean
    public DefaultPahoMessageConverter pahoMessageConverter() {
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(1, false, "UTF-8");
        return converter;
    }

}

ОБНОВЛЕНИЕ

Еще проще... та же ошибка:

@Autowired
MqttPahoClientFactory mqttPahoClientFactory;

private MessageHandler mqttOutbound;

@Before
public void setUp() throws Exception {
    MqttPahoMessageHandler messageHandler  = new MqttPahoMessageHandler(mqttClientId, mqttPahoClientFactory);
    messageHandler.setAsync(false);
    messageHandler.setDefaultTopic(mqttTopic);
    messageHandler.setConverter(new DefaultPahoMessageConverter());
    mqttOutbound = messageHandler;

}

@Test
public void test0() throws Exception {
    mqttOutbound.handleMessage(MessageBuilder.withPayload("test").build());
    Thread.sleep(10000L);
}



Ответы (1)


Хорошо, решено: Пахо, по-видимому, закрыл мое соединение, поскольку и тестовый, и основной использовали один и тот же идентификатор клиента. Решение состояло в том, чтобы заменить clientId на MqttAsyncClient.generateClientId(), как предлагается здесь: https://stackoverflow.com/a/48232793/2739681

person PeMa    schedule 12.07.2018
comment
Я добавил к образцу тест: github.com/spring-projects/ spring-integration-samples/pull/229 - person Gary Russell; 12.07.2018
comment
Да, круто. Спасибо, выглядит немного более сложным, чем мой подход, и, очевидно, вы знаете, как добавить брокера в памяти, поэтому я могу пропустить новый вопрос об этом. - person PeMa; 13.07.2018