Skip to content

Commit

Permalink
fixes issue with assigning consumer values
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Jul 15, 2023
1 parent 8bf1763 commit d5ea2d4
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.example.boot.kafka.reactor.entity.MessageDTO;
import com.example.boot.kafka.reactor.util.AppConstants;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -13,15 +14,18 @@

@EnableKafka
@Configuration(proxyBeanMethods = false)
@Slf4j
public class KafkaConfiguration {

@Bean
NewTopic helloTopic() {
log.info("Creating helloTopic");
return new NewTopic(AppConstants.HELLO_TOPIC, 1, (short) 1);
}

@Bean
KafkaSender<Integer, MessageDTO> reactiveKafkaSender(KafkaProperties properties) {
log.info("Creating Sender");
Map<String, Object> props = properties.buildProducerProperties();
return KafkaSender.create(SenderOptions.create(props));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.example.boot.kafka.reactor.config.MyTestContainers;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
Expand All @@ -12,16 +11,18 @@
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.test.StepVerifier;

@Slf4j
@SpringBootTest(classes = {MyTestContainers.class})
@SpringBootTest(classes = TestBootKafkaReactorProducerApplication.class)
@ActiveProfiles("test")
@AutoConfigureWebTestClient
@Slf4j
class BootKafkaReactorProducerApplicationTests {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,55 @@
package com.example.boot.kafka.reactor;

import com.example.boot.kafka.reactor.config.MyTestContainers;
import static com.example.boot.kafka.reactor.util.AppConstants.HELLO_TOPIC;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

@TestConfiguration(proxyBeanMethods = false)
@Import(MyTestContainers.class)
@Slf4j
public class TestBootKafkaReactorProducerApplication {

@Bean
@ServiceConnection
PostgreSQLContainer<?> postgresContainer() {
return new PostgreSQLContainer<>(DockerImageName.parse("postgres:15.3-alpine"));
}

@Bean
@ServiceConnection
KafkaContainer kafkaContainer(DynamicPropertyRegistry propertyRegistry) {
KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag("7.4.1"))
.withKraft();
propertyRegistry.add("spring.kafka.producer.bootstrapServers", kafkaContainer::getBootstrapServers);
propertyRegistry.add("spring.kafka.consumer.bootstrapServers", kafkaContainer::getBootstrapServers);
return kafkaContainer;
}

@Bean
KafkaReceiver<Integer, MessageDTO> receiver(KafkaProperties properties) {
log.info("Creating receiver");
ReceiverOptions<Integer, MessageDTO> receiverOptions = ReceiverOptions.<Integer, MessageDTO>create(
properties.buildConsumerProperties())
.subscription(Collections.singleton(HELLO_TOPIC))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

return KafkaReceiver.create(receiverOptions);
}

public static void main(String[] args) {
SpringApplication.from(BootKafkaReactorProducerApplication::main)
.with(TestBootKafkaReactorProducerApplication.class)
Expand Down

This file was deleted.

0 comments on commit d5ea2d4

Please sign in to comment.