I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.
The definition of my listener is very basic.
@Component
public class Listener {
    private final CountDownLatch latch;
    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }
    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}
Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
    @Autowired
    private KafkaEmbedded embeddedKafka;
    @Autowired
    private CountDownLatch latch;
    private KafkaTemplate<Integer, String> producer;
    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }
    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }
    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}
Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?
All the code is shared in my GitHub repository kafka-listener.
Thanks to all.
 
     
    