kafka UT

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
public class KafkaTest {

    @Autowired
    private WebApplicationContext wac;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;


    @Autowired
    private KafkaListenerEndpointRegistry registry;


    String topicName = "unit";
    String key = "12345-54321";

    @Before
    public void before() throws Exception {
//        this.mvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
    }

    @After
    public void after() throws Exception {
    }

//    @Test
//    public void sendToKafka(){
//        kafkaTemplate.send(topicName, key, "");
//    }

    @Test
    public void receiveFrmKafka() throws InterruptedException {
        ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
                .getListenerContainer("foo");
        container.stop();
        @SuppressWarnings("unchecked")
        BatchMessagingMessageListenerAdapter<String, String> messageListener =
                (BatchMessagingMessageListenerAdapter<String, String>) container.getContainerProperties().getMessageListener();
        CountDownLatch latch = new CountDownLatch(1);
        container.getContainerProperties()
                .setMessageListener(new BatchAcknowledgingMessageListener<String, String>() {

                    @Override
                    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
                        messageListener.onMessage(records, acknowledgment);
                        for(ConsumerRecord<String, String> record : records){
                            LoggerUtil.info("get key {} value {}", record.key(), record.value());
                        }
                        latch.countDown();
                    }

                });
        container.start();
        kafkaTemplate.send("asr_topic", "foo", "");
        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
    }
}
comments powered by Disqus