diff --git a/kafka-sample/spring-boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java b/kafka-sample/spring-boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java index fa64ffa1..f6f4fc4b 100644 --- a/kafka-sample/spring-boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java +++ b/kafka-sample/spring-boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java @@ -40,7 +40,7 @@ public class KafkaConfig implements KafkaListenerConfigurer { private final LocalValidatorFactoryBean validator; @Bean - public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory pf) { + RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory pf) { // Clone the PF with a different Serializer, register with Spring for shutdown Map configs = new HashMap<>(pf.getConfigurationProperties()); @@ -56,7 +56,7 @@ public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, P } @Bean - public ConsumerFactory simpleKafkaConsumerFactory() { + ConsumerFactory simpleKafkaConsumerFactory() { Map consumerProperties = this.properties.buildConsumerProperties(); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -64,17 +64,17 @@ public ConsumerFactory simpleKafkaConsumerFactory() { } @Bean("simpleKafkaListenerContainerFactory") - ConcurrentKafkaListenerContainerFactory simpleKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory simpleKafkaListenerContainerFactory( + ConsumerFactory simpleKafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(simpleKafkaConsumerFactory()); + factory.setConsumerFactory(simpleKafkaConsumerFactory); return factory; } // Second consumer config - @Bean - public ConsumerFactory jsonKafkaConsumerFactory() { + ConsumerFactory jsonKafkaConsumerFactory() { Map consumerProperties = this.properties.buildConsumerProperties(); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); @@ -83,10 +83,11 @@ public ConsumerFactory jsonKafkaConsumerFactory() { } @Bean("jsonKafkaListenerContainerFactory") - ConcurrentKafkaListenerContainerFactory jsonKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory jsonKafkaListenerContainerFactory( + ConsumerFactory jsonKafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(jsonKafkaConsumerFactory()); + factory.setConsumerFactory(jsonKafkaConsumerFactory); return factory; } diff --git a/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/SpringBootKafkaMultiApplicationIntegrationTest.java b/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/SpringBootKafkaMultiApplicationIntegrationTest.java new file mode 100644 index 00000000..26919cb8 --- /dev/null +++ b/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/SpringBootKafkaMultiApplicationIntegrationTest.java @@ -0,0 +1,43 @@ +package com.example.springbootkafka.multi; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +import com.example.springbootkafka.multi.domain.SimpleMessage; +import com.example.springbootkafka.multi.receiver.JsonReceiver; +import com.example.springbootkafka.multi.receiver.SimpleReceiver; +import com.example.springbootkafka.multi.sender.Sender; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest(classes = TestSpringBootKafkaMultiApplication.class) +class SpringBootKafkaMultiApplicationIntegrationTest { + + @Autowired + private Sender sender; + + @Autowired + private SimpleReceiver simpleReceiver; + + @Autowired + private JsonReceiver jsonReceiver; + + @Test + void sendAndReceiveData() throws Exception { + sender.send(10, "foo"); + await().pollDelay(1, TimeUnit.SECONDS).atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat( + simpleReceiver.getLatch().getCount()) + .isZero()); + } + + @Test + void sendAndReceiveJsonData() throws Exception { + SimpleMessage simpleMessage = new SimpleMessage(110, "My Json Message"); + sender.send(simpleMessage); + await().pollDelay(1, TimeUnit.SECONDS).atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat( + jsonReceiver.getLatch().getCount()) + .isZero()); + } +} diff --git a/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/TestSpringBootKafkaMultiApplication.java b/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/TestSpringBootKafkaMultiApplication.java index 9ee4628a..1f08c7d9 100644 --- a/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/TestSpringBootKafkaMultiApplication.java +++ b/kafka-sample/spring-boot-multiple-producers-consumers/src/test/java/com/example/springbootkafka/multi/TestSpringBootKafkaMultiApplication.java @@ -1,21 +1,19 @@ package com.example.springbootkafka.multi; import org.springframework.boot.SpringApplication; -import org.springframework.boot.devtools.restart.RestartScope; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.springframework.context.annotation.Bean; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; @TestConfiguration(proxyBeanMethods = false) public class TestSpringBootKafkaMultiApplication { - @Bean - @ServiceConnection - @RestartScope - public KafkaContainer kafkaContainer() { - return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")).withKraft(); + private static final KafkaContainer kafkaContainer = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")).withKraft(); + + static { + kafkaContainer.start(); + System.setProperty("spring.kafka.bootstrap-servers", kafkaContainer.getBootstrapServers()); } public static void main(String[] args) {