Skip to content

Commit

Permalink
Producer doesn't need db, hence removing it
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Jul 15, 2023
1 parent f7b2d73 commit 99842dd
Show file tree
Hide file tree
Showing 14 changed files with 30 additions and 231 deletions.
3 changes: 3 additions & 0 deletions .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ vscode:
- vscjava.vscode-java-pack
- Pivotal.vscode-boot-dev-pack
- GabrielBB.vscode-lombok
- ms-azuretools.vscode-docker

ports:
- port: 5050
visibility: public
- port: 9091-9093
visibility: public
- port: 8080
Expand Down
105 changes: 0 additions & 105 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,6 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "AnalyticsConsumerApplication",
"request": "launch",
"mainClass": "com.example.analytics.AnalyticsConsumerApplication",
"projectName": "analytics-spring-cloud-streams-kafka-consumer"
},
{
"type": "java",
"name": "TestAnalyticsConsumerApplication",
"request": "launch",
"mainClass": "com.example.analytics.TestAnalyticsConsumerApplication",
"projectName": "analytics-spring-cloud-streams-kafka-consumer"
},
{
"type": "java",
"name": "AnalyticsProducerApplication",
"request": "launch",
"mainClass": "com.example.analytics.AnalyticsProducerApplication",
"projectName": "analytics-spring-cloud-streams-kafka-producer"
},
{
"type": "java",
"name": "Spring Boot-BootKafkaReactorProducerApplication<boot-kafka-reactor-producer>",
Expand All @@ -34,90 +13,6 @@
"mainClass": "com.example.boot.kafka.reactor.BootKafkaReactorProducerApplication",
"projectName": "boot-kafka-reactor-producer",
"args": "--spring.profiles.active=local"
},
{
"type": "java",
"name": "TestBootKafkaReactorProducerApplication",
"request": "launch",
"mainClass": "com.example.boot.kafka.reactor.TestBootKafkaReactorProducerApplication",
"projectName": "boot-kafka-reactor-producer"
},
{
"type": "java",
"name": "KafkaDeadLetterPublishingApplication",
"request": "launch",
"mainClass": "com.github.timtebeek.KafkaDeadLetterPublishingApplication",
"projectName": "kafka-dead-letter-publishing"
},
{
"type": "java",
"name": "MessageConsumer",
"request": "launch",
"mainClass": "com.sivalabs.kafkasample.MessageConsumer",
"projectName": "kafka-java-sample"
},
{
"type": "java",
"name": "MessageProducer",
"request": "launch",
"mainClass": "com.sivalabs.kafkasample.MessageProducer",
"projectName": "kafka-java-sample"
},
{
"type": "java",
"name": "MessageStreamListener",
"request": "launch",
"mainClass": "com.sivalabs.kafkasample.MessageStreamListener",
"projectName": "kafka-java-sample"
},
{
"type": "java",
"name": "SpringBootKafkaAvroConsumerApplication",
"request": "launch",
"mainClass": "com.example.springbootkafkaavro.SpringBootKafkaAvroConsumerApplication",
"projectName": "spring-boot-kafka-avro-consumer"
},
{
"type": "java",
"name": "SpringBootKafkaAvroProducerApplication",
"request": "launch",
"mainClass": "com.example.springbootkafkaavro.SpringBootKafkaAvroProducerApplication",
"projectName": "spring-boot-kafka-avro-producer"
},
{
"type": "java",
"name": "SpringBootKafkaSampleApplication",
"request": "launch",
"mainClass": "com.example.springbootkafkasample.SpringBootKafkaSampleApplication",
"projectName": "spring-boot-kafka-sample"
},
{
"type": "java",
"name": "TestSpringBootKafkaSampleApplication",
"request": "launch",
"mainClass": "com.example.springbootkafkasample.TestSpringBootKafkaSampleApplication",
"projectName": "spring-boot-kafka-sample"
},
{
"type": "java",
"name": "SpringBootKafkaMultiApplication",
"request": "launch",
"mainClass": "com.example.springbootkafka.multi.SpringBootKafkaMultiApplication",
"projectName": "spring-boot-multiple-producers-consumers"
},
{
"type": "java",
"name": "TestSpringBootKafkaMultiApplication",
"request": "launch",
"mainClass": "com.example.springbootkafka.multi.TestSpringBootKafkaMultiApplication",
"projectName": "spring-boot-multiple-producers-consumers"
},
{
"type": "java",
"name": "SpringKafkaDemo",
"request": "launch",
"mainClass": "com.sivalabs.sample.SpringKafkaDemo",
"projectName": "spring-kafka-sample"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import jakarta.validation.constraints.NotBlank;
import java.time.LocalDateTime;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

@Table(name = "messages")
public record MessageDTO(Long id, @NotBlank(message = "Text Value Cant be Blank") String text, LocalDateTime sentAt) {}
public record MessageDTO(
@Id Long id, @NotBlank(message = "Text Value Cant be Blank") String text, LocalDateTime sentAt) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
application.name=boot-kafka-reactor-consumer
server.port=8081
################ Logging #####################
logging.file.name=logs/boot-kafka-reactor-consumer.log
logging.file.name=logs/${application.name}.log
logging.level.web=INFO
logging.level.sql=INFO
## To enable transaction details logging
Expand Down
39 changes: 0 additions & 39 deletions kafka-reactor/boot-kafka-reactor-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
Expand All @@ -57,17 +45,6 @@
<version>${org.springdoc.version}</version>
</dependency>


<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down Expand Up @@ -95,11 +72,6 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
Expand All @@ -109,17 +81,6 @@
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
<version>1.18.3</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/messages")
Expand All @@ -21,8 +19,8 @@ class MessageController {
private final MessageService messageService;

@PostMapping
public Mono<ResponseEntity<Object>> sendMessage(@RequestBody @Valid MessageDTO messageDTO) {
public void sendMessage(@RequestBody @Valid MessageDTO messageDTO) {
log.debug("sending messageDTO: {}", messageDTO);
return messageService.sendMessage(messageDTO);
messageService.sendMessage(messageDTO);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.example.boot.kafka.reactor.entity;

import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.constraints.NotBlank;
import java.time.LocalDateTime;
import org.springframework.data.relational.core.mapping.Table;

@Table(name = "messages")
public record MessageDTO(Long id, @NotBlank(message = "Text Value Cant be Blank") String text, LocalDateTime sentAt) {}
public record MessageDTO(
@JsonIgnore Long id, @NotBlank(message = "Text Value Cant be Blank") String text, LocalDateTime sentAt) {}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package com.example.boot.kafka.reactor.service;

import com.example.boot.kafka.reactor.entity.MessageDTO;
import com.example.boot.kafka.reactor.repository.MessageRepository;
import com.example.boot.kafka.reactor.util.AppConstants;
import java.net.URI;
import java.security.SecureRandom;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;

Expand All @@ -23,30 +19,22 @@
public class MessageService {

private final KafkaSender<Integer, MessageDTO> sender;
private final MessageRepository messageRepository;

public Mono<ResponseEntity<Object>> sendMessage(MessageDTO messageDTO) {
public void sendMessage(MessageDTO messageDTO) {
Integer key = new SecureRandom().nextInt(Integer.MAX_VALUE);
return saveMessage(messageDTO)
.doOnSuccess(it -> this.sender
.send(Flux.just(
SenderRecord.create(new ProducerRecord<>(AppConstants.HELLO_TOPIC, key, it), key)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
log.info(
"Message {} sent successfully, topic-partition={}-{} offset={} timestamp={}",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
LocalDateTime.now());
}))
.map(it -> ResponseEntity.created(URI.create("/messages/" + it.id()))
.build());
}

private Mono<MessageDTO> saveMessage(MessageDTO messageDTO) {
return messageRepository.save(messageDTO);
this.sender
.send(Flux.just(
SenderRecord.create(new ProducerRecord<>(AppConstants.HELLO_TOPIC, key, messageDTO), key)))
.doOnError(e -> log.error("Send failed", e))
.subscribe(r -> {
RecordMetadata metadata = r.recordMetadata();
log.info(
"Message {} sent successfully, topic-partition={}-{} offset={} timestamp={}",
r.correlationMetadata(),
metadata.topic(),
metadata.partition(),
metadata.offset(),
LocalDateTime.now());
});
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void loadDataAndConsume() throws InterruptedException {
.body(Mono.just(requestBody), String.class)
.exchange()
.expectStatus()
.isCreated();
.isOk();

TimeUnit.SECONDS.sleep(5);
Flux<MessageDTO> flux = receiver.receive().map(record -> {
Expand Down
Loading

0 comments on commit 99842dd

Please sign in to comment.