I created a spring boot application that sends messages to a Kafka topic. I am using spring spring-integration-kafka
:
A KafkaProducerMessageHandler<String,String>
is subscribed to a channel (SubscribableChannel
) and pushes all messages received to one topic.
The application works fine. I see messages arriving in Kafka via console consumer (local kafka).
I also create an Integrationtest that uses KafkaEmbedded
. I am checking the expected messages by subscribing to the channel within the test - all is fine.
But i want the test to check also the messages put into kafka. Sadly Kafka's JavaDoc is not the best. What i tried so far is:
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {
mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );
}
//...
@Test
public void endToEnd() throws Exception {
// ...
ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );
StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );
}
The problem is that i don't see any records. I am not sure if my KafkaEmbedded setup is correct. But messages are receive by the channel.
See Question&Answers more detail:os