Skip to content

Commit

Permalink
fixes issue with testcontainers in dev mode
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Jul 16, 2023
1 parent d96fb2f commit fd7dde2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class KafkaConfig implements KafkaListenerConfigurer {
private final LocalValidatorFactoryBean validator;

@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory<Object, Object> pf) {
RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory<Object, Object> pf) {

// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
Expand All @@ -56,25 +56,25 @@ public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, P
}

@Bean
public ConsumerFactory<Integer, String> simpleKafkaConsumerFactory() {
ConsumerFactory<Integer, String> simpleKafkaConsumerFactory() {
Map<String, Object> consumerProperties = this.properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}

@Bean("simpleKafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<Integer, String> simpleKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> simpleKafkaListenerContainerFactory(
ConsumerFactory<Integer, String> simpleKafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(simpleKafkaConsumerFactory());
factory.setConsumerFactory(simpleKafkaConsumerFactory);
return factory;
}

// Second consumer config

@Bean
public ConsumerFactory<String, SimpleMessage> jsonKafkaConsumerFactory() {
ConsumerFactory<String, SimpleMessage> jsonKafkaConsumerFactory() {
Map<String, Object> consumerProperties = this.properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
Expand All @@ -83,10 +83,11 @@ public ConsumerFactory<String, SimpleMessage> jsonKafkaConsumerFactory() {
}

@Bean("jsonKafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<String, SimpleMessage> jsonKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SimpleMessage> jsonKafkaListenerContainerFactory(
ConsumerFactory<String, SimpleMessage> jsonKafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, SimpleMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonKafkaConsumerFactory());
factory.setConsumerFactory(jsonKafkaConsumerFactory);
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.example.springbootkafka.multi;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import com.example.springbootkafka.multi.domain.SimpleMessage;
import com.example.springbootkafka.multi.receiver.JsonReceiver;
import com.example.springbootkafka.multi.receiver.SimpleReceiver;
import com.example.springbootkafka.multi.sender.Sender;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = TestSpringBootKafkaMultiApplication.class)
class SpringBootKafkaMultiApplicationIntegrationTest {

@Autowired
private Sender sender;

@Autowired
private SimpleReceiver simpleReceiver;

@Autowired
private JsonReceiver jsonReceiver;

@Test
void sendAndReceiveData() throws Exception {
sender.send(10, "foo");
await().pollDelay(1, TimeUnit.SECONDS).atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat(
simpleReceiver.getLatch().getCount())
.isZero());
}

@Test
void sendAndReceiveJsonData() throws Exception {
SimpleMessage simpleMessage = new SimpleMessage(110, "My Json Message");
sender.send(simpleMessage);
await().pollDelay(1, TimeUnit.SECONDS).atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat(
jsonReceiver.getLatch().getCount())
.isZero());
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package com.example.springbootkafka.multi;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.devtools.restart.RestartScope;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
public class TestSpringBootKafkaMultiApplication {

@Bean
@ServiceConnection
@RestartScope
public KafkaContainer kafkaContainer() {
return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")).withKraft();
private static final KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1")).withKraft();

static {
kafkaContainer.start();
System.setProperty("spring.kafka.bootstrap-servers", kafkaContainer.getBootstrapServers());
}

public static void main(String[] args) {
Expand Down

0 comments on commit fd7dde2

Please sign in to comment.