Skip to content

Commit

Permalink
fixes issue with sending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Jun 15, 2023
1 parent f7e8ef8 commit cbac995
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ HELP.md
/analytics-spring-cloud-streams-kafka-producer/target/
kafka-java-sample/target/
spring-kafka-sample/target/
/logs/
logs/
5 changes: 4 additions & 1 deletion kafka-reactor/boot-kafka-reactor-producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ $ ./mvnw spring-boot:run -Dspring-boot.run.profiles=local

### Useful Links
* Swagger UI: http://localhost:8080/swagger-ui.html
* Actuator Endpoint: http://localhost:8080/actuator
* Actuator Endpoint: http://localhost:8080/actuator
* Kafdrop UI: http://localhost:9000
* PGAdmin UI: http://localhost:5050 ([email protected]/admin)

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,34 @@ services:
KAFKA_CONTROLLER_QUORUM_MODE: 'kraft'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'

kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "broker:29092"
depends_on:
- broker

postgresqldb:
image: postgres:15.3-alpine
environment:
- POSTGRES_USER=appuser
- POSTGRES_PASSWORD=secret
- POSTGRES_DB=appdb
ports:
- "5432:5432"
- "5432:5432"

pgadmin4:
container_name: pgadmin4
image: dpage/pgadmin4
extra_hosts: [ 'host.docker.internal:host-gateway' ]
environment:
- [email protected]
- PGADMIN_DEFAULT_PASSWORD=admin
ports:
- "5050:80"
depends_on:
- postgresqldb
restart: unless-stopped
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.example.boot.kafka.reactor.config;

import com.example.boot.kafka.reactor.dto.MessageDTO;
import com.example.boot.kafka.reactor.util.AppConstants;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -15,11 +13,6 @@
@Configuration(proxyBeanMethods = false)
public class KafkaConfiguration {

@Bean
public NewTopic myTopic() {
return new NewTopic(AppConstants.HELLO_TOPIC, 1, (short) 1);
}

@Bean
public KafkaSender<Integer, MessageDTO> reactiveKafkaSender(KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,56 +1,28 @@
package com.example.boot.kafka.reactor.controller;

import com.example.boot.kafka.reactor.dto.MessageDTO;
import com.example.boot.kafka.reactor.repository.MessageRepository;
import com.example.boot.kafka.reactor.util.AppConstants;
import java.net.URI;
import java.security.SecureRandom;
import java.time.LocalDateTime;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import com.example.boot.kafka.reactor.service.MessageService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;

@RestController
@RequestMapping("/messages")
@RequiredArgsConstructor
@Slf4j
class MessageController {
private final KafkaSender<Integer, MessageDTO> sender;
private final MessageRepository messageRepository;

private final MessageService messageService;

@PostMapping
public Mono<ResponseEntity<Object>> sendMessage(@RequestBody MessageDTO messageDTO) {
public Mono<ResponseEntity<Object>> sendMessage(@RequestBody @Valid MessageDTO messageDTO) {
log.debug("sending messageDTO: {}", messageDTO);
Integer key = new SecureRandom().nextInt(Integer.MAX_VALUE);
return messageRepository
.save(messageDTO)
.doOnSuccess(it -> {
this.sender
.send(Flux.just(
SenderRecord.create(new ProducerRecord<>(AppConstants.HELLO_TOPIC, key, it), key)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
log.debug(
"Message {} sent successfully, topic-partition={}-{} offset={} timestamp={}",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
LocalDateTime.now());
});
})
.map(it -> ResponseEntity.created(URI.create("/messages/" + it.id()))
.build());
return messageService.sendMessage(messageDTO);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.example.boot.kafka.reactor.entity;

import jakarta.validation.constraints.NotBlank;
import java.time.LocalDateTime;
import org.springframework.data.relational.core.mapping.Table;

@Table(name = "messages")
public record MessageDTO(Long id, @NotBlank(message = "Text Value Cant be Blank") String text, LocalDateTime sentAt) {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.example.boot.kafka.reactor.repository;

import com.example.boot.kafka.reactor.dto.MessageDTO;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import org.springframework.data.r2dbc.repository.R2dbcRepository;

public interface MessageRepository extends R2dbcRepository<MessageDTO, Long> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.example.boot.kafka.reactor.service;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import com.example.boot.kafka.reactor.repository.MessageRepository;
import com.example.boot.kafka.reactor.util.AppConstants;
import java.net.URI;
import java.security.SecureRandom;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;

@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService {

private final KafkaSender<Integer, MessageDTO> sender;
private final MessageRepository messageRepository;

public Mono<ResponseEntity<Object>> sendMessage(MessageDTO messageDTO) {
Integer key = new SecureRandom().nextInt(Integer.MAX_VALUE);
return saveMessage(messageDTO)
.doOnSuccess(it -> this.sender
.send(Flux.just(
SenderRecord.create(new ProducerRecord<>(AppConstants.HELLO_TOPIC, key, it), key)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
log.debug(
"Message {} sent successfully, topic-partition={}-{} offset={} timestamp={}",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
LocalDateTime.now());
}))
.map(it -> ResponseEntity.created(URI.create("/messages/" + it.id()))
.build());
}

private Mono<MessageDTO> saveMessage(MessageDTO messageDTO) {
return messageRepository.save(messageDTO);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
spring.kafka.bootstrap-servers=localhost:29200
#spring.kafka.bootstrap-servers=localhost:9092

spring.r2dbc.url=r2dbc:postgresql://localhost:5432/appdb
spring.r2dbc.username=appuser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ management.endpoints.web.exposure.include=configprops,env,health,info,logfile,lo
management.endpoint.health.show-details=always

################ Kafka ########################
spring.kafka.bootstrap-servers=localhost:9200
# producer
#spring.kafka.producer.acks=all
spring.kafka.producer.acks=all
spring.kafka.producer.clientId=sample-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Expand All @@ -26,4 +25,4 @@ spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.seria
# json deserializer config
#spring.kafka.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.boot.kafka.reactor.dto.MessageDTO
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.boot.kafka.reactor.entity.MessageDTO
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<changeSet author="app" id="createTable-message">
<createSequence
sequenceName="messages_seq"
incrementBy="50"
incrementBy="1"
startValue="1"
/>
<createTable tableName="messages">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static com.example.boot.kafka.reactor.util.AppConstants.HELLO_TOPIC;

import com.example.boot.kafka.reactor.dto.MessageDTO;
import com.example.boot.kafka.reactor.entity.MessageDTO;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
Expand Down

0 comments on commit cbac995

Please sign in to comment.