diff --git a/.github/workflows/spring-boot-kafka-avro-consumer.yml b/.github/workflows/spring-boot-kafka-avro-consumer.yml new file mode 100644 index 00000000..a5746128 --- /dev/null +++ b/.github/workflows/spring-boot-kafka-avro-consumer.yml @@ -0,0 +1,33 @@ +name: spring-boot-kafka-avro-consumer + +on: + push: + paths: + - "kafka-avro/spring-boot-kafka-avro-consumer/**" + branches: [ master ] + pull_request: + paths: + - "kafka-avro/spring-boot-kafka-avro-consumer/**" + types: + - opened + - synchronize + - reopened + +jobs: + build: + runs-on: ubuntu-latest + defaults: + run: + working-directory: "kafka-avro/spring-boot-kafka-avro-consumer" + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Set up JDK + uses: actions/setup-java@v3.11.0 + with: + java-version: '17' + distribution: 'microsoft' + cache: 'maven' + - name: Build with Maven + run: ./mvnw -B clean verify --file pom.xml diff --git a/.github/workflows/spring-boot-kafka-avro.yml b/.github/workflows/spring-boot-kafka-avro-producer.yml similarity index 100% rename from .github/workflows/spring-boot-kafka-avro.yml rename to .github/workflows/spring-boot-kafka-avro-producer.yml diff --git a/.vscode/launch.json b/.vscode/launch.json index 70ec3f34..dcd79fd7 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,11 +22,21 @@ }, { "type": "java", - "name": "Spring Boot-SpringBootKafkaAvroApplication", + "name": "Spring Boot-SpringBootKafkaAvroConsumerApplication", "request": "launch", "cwd": "${workspaceFolder}", - "mainClass": "com.example.springbootkafkaavro.SpringBootKafkaAvroApplication", - "projectName": "spring-boot-kafka-avro", + "mainClass": "com.example.springbootkafkaavro.SpringBootKafkaAvroConsumerApplication", + "projectName": "spring-boot-kafka-avro-consumer", + "args": "", + "envFile": "${workspaceFolder}/.env" + }, + { + "type": "java", + "name": "Spring Boot-SpringBootKafkaAvroProducerApplication", + "request": "launch", + "cwd": "${workspaceFolder}", + "mainClass": "com.example.springbootkafkaavro.SpringBootKafkaAvroProducerApplication", + "projectName": "spring-boot-kafka-avro-producer", "args": "", "envFile": "${workspaceFolder}/.env" } diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/.gitignore b/kafka-avro/spring-boot-kafka-avro-consumer/.gitignore new file mode 100644 index 00000000..a2a3040a --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.jar b/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 00000000..cb28b0e3 Binary files /dev/null and b/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.jar differ diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.properties b/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 00000000..3c6fda8c --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.2/apache-maven-3.9.2-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/README.md b/kafka-avro/spring-boot-kafka-avro-consumer/README.md new file mode 100644 index 00000000..094a0c61 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/README.md @@ -0,0 +1,17 @@ +# kafka avro + + +Apache Kafka is an open-source stream-processing platform that is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, and fast. + +Apache Avro is a data serialization system that that provides a compact and efficient binary format for data serialization and is commonly used with Apache Kafka. It is a compact and efficient binary format that allows for the serialization of data with a schema. This means that the structure of the data can be defined, and the data can be self-describing, which can be useful when working with complex data structures. + +Together, Kafka and Avro can be used to create a powerful platform for building real-time data pipelines and streaming applications. Avro is often used with Kafka because it supports schema evolution, which allows for the evolution of data over time without the need to update all the systems that are consuming that data. This makes it an ideal choice for use with Kafka, where data is constantly being generated and consumed by a variety of systems. + + +### Run locally +`$ ./mvnw spring-boot:run` + +### Useful Links +* Swagger UI: http://localhost:8080/swagger-ui.html +* Actuator Endpoint: http://localhost:8080/actuator +* Schema Registry : http://localhost:8081/subjects/persons-value/versions?normalize=false diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/docker/docker-compose.yml b/kafka-avro/spring-boot-kafka-avro-consumer/docker/docker-compose.yml new file mode 100644 index 00000000..f8a4d757 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/docker/docker-compose.yml @@ -0,0 +1,42 @@ +version: '3.9' + +services: + kafka: + image: confluentinc/cp-kafka:7.4.0 + hostname: broker + container_name: broker + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_CONTROLLER_QUORUM_MODE: 'kraft' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + + schema-registry: + image: confluentinc/cp-schema-registry:7.4.0 + hostname: schema-registry + container_name: schema-registry + depends_on: + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java b/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java new file mode 100644 index 00000000..f417ccc6 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java @@ -0,0 +1,477 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package com.example.springbootkafkaavro.model; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class Person extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5166273048254553824L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.example.springbootkafkaavro.model\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\",\"avro.java.string\":\"String\"},{\"name\":\"age\",\"type\":\"int\"}],\"version\":\"1\"}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this Person to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a Person from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a Person instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static Person fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private long id; + private java.lang.CharSequence name; + private int age; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public Person() {} + + /** + * All-args constructor. + * @param id The new value for id + * @param name The new value for name + * @param age The new value for age + */ + public Person(java.lang.Long id, java.lang.CharSequence name, java.lang.Integer age) { + this.id = id; + this.name = name; + this.age = age; + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + + @Override + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return name; + case 2: return age; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.Long)value$; break; + case 1: name = (java.lang.CharSequence)value$; break; + case 2: age = (java.lang.Integer)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'id' field. + * @return The value of the 'id' field. + */ + public long getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value the value to set. + */ + public void setId(long value) { + this.id = value; + } + + /** + * Gets the value of the 'name' field. + * @return The value of the 'name' field. + */ + public java.lang.CharSequence getName() { + return name; + } + + + /** + * Sets the value of the 'name' field. + * @param value the value to set. + */ + public void setName(java.lang.CharSequence value) { + this.name = value; + } + + /** + * Gets the value of the 'age' field. + * @return The value of the 'age' field. + */ + public int getAge() { + return age; + } + + + /** + * Sets the value of the 'age' field. + * @param value the value to set. + */ + public void setAge(int value) { + this.age = value; + } + + /** + * Creates a new Person RecordBuilder. + * @return A new Person RecordBuilder + */ + public static com.example.springbootkafkaavro.model.Person.Builder newBuilder() { + return new com.example.springbootkafkaavro.model.Person.Builder(); + } + + /** + * Creates a new Person RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Person RecordBuilder + */ + public static com.example.springbootkafkaavro.model.Person.Builder newBuilder(com.example.springbootkafkaavro.model.Person.Builder other) { + if (other == null) { + return new com.example.springbootkafkaavro.model.Person.Builder(); + } else { + return new com.example.springbootkafkaavro.model.Person.Builder(other); + } + } + + /** + * Creates a new Person RecordBuilder by copying an existing Person instance. + * @param other The existing instance to copy. + * @return A new Person RecordBuilder + */ + public static com.example.springbootkafkaavro.model.Person.Builder newBuilder(com.example.springbootkafkaavro.model.Person other) { + if (other == null) { + return new com.example.springbootkafkaavro.model.Person.Builder(); + } else { + return new com.example.springbootkafkaavro.model.Person.Builder(other); + } + } + + /** + * RecordBuilder for Person instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long id; + private java.lang.CharSequence name; + private int age; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(com.example.springbootkafkaavro.model.Person.Builder other) { + super(other); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.age)) { + this.age = data().deepCopy(fields()[2].schema(), other.age); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + } + + /** + * Creates a Builder by copying an existing Person instance + * @param other The existing instance to copy. + */ + private Builder(com.example.springbootkafkaavro.model.Person other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.id)) { + this.id = data().deepCopy(fields()[0].schema(), other.id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.name)) { + this.name = data().deepCopy(fields()[1].schema(), other.name); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.age)) { + this.age = data().deepCopy(fields()[2].schema(), other.age); + fieldSetFlags()[2] = true; + } + } + + /** + * Gets the value of the 'id' field. + * @return The value. + */ + public long getId() { + return id; + } + + + /** + * Sets the value of the 'id' field. + * @param value The value of 'id'. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder setId(long value) { + validate(fields()[0], value); + this.id = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'id' field has been set. + * @return True if the 'id' field has been set, false otherwise. + */ + public boolean hasId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'id' field. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder clearId() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'name' field. + * @return The value. + */ + public java.lang.CharSequence getName() { + return name; + } + + + /** + * Sets the value of the 'name' field. + * @param value The value of 'name'. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder setName(java.lang.CharSequence value) { + validate(fields()[1], value); + this.name = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'name' field has been set. + * @return True if the 'name' field has been set, false otherwise. + */ + public boolean hasName() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'name' field. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder clearName() { + name = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'age' field. + * @return The value. + */ + public int getAge() { + return age; + } + + + /** + * Sets the value of the 'age' field. + * @param value The value of 'age'. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder setAge(int value) { + validate(fields()[2], value); + this.age = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'age' field has been set. + * @return True if the 'age' field has been set, false otherwise. + */ + public boolean hasAge() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'age' field. + * @return This builder. + */ + public com.example.springbootkafkaavro.model.Person.Builder clearAge() { + fieldSetFlags()[2] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Person build() { + try { + Person record = new Person(); + record.id = fieldSetFlags()[0] ? this.id : (java.lang.Long) defaultValue(fields()[0]); + record.name = fieldSetFlags()[1] ? this.name : (java.lang.CharSequence) defaultValue(fields()[1]); + record.age = fieldSetFlags()[2] ? this.age : (java.lang.Integer) defaultValue(fields()[2]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeLong(this.id); + + out.writeString(this.name); + + out.writeInt(this.age); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.id = in.readLong(); + + this.name = in.readString(this.name instanceof Utf8 ? (Utf8)this.name : null); + + this.age = in.readInt(); + + } else { + for (int i = 0; i < 3; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.id = in.readLong(); + break; + + case 1: + this.name = in.readString(this.name instanceof Utf8 ? (Utf8)this.name : null); + break; + + case 2: + this.age = in.readInt(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/mvnw b/kafka-avro/spring-boot-kafka-avro-consumer/mvnw new file mode 100755 index 00000000..8d937f4c --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/mvnw @@ -0,0 +1,308 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Apache Maven Wrapper startup batch script, version 3.2.0 +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /usr/local/etc/mavenrc ] ; then + . /usr/local/etc/mavenrc + fi + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "$(uname)" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME + else + JAVA_HOME="/Library/Java/Home"; export JAVA_HOME + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --unix "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --unix "$CLASSPATH") +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && + JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="$(which javac)" + if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=$(which readlink) + if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then + if $darwin ; then + javaHome="$(dirname "\"$javaExecutable\"")" + javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" + else + javaExecutable="$(readlink -f "\"$javaExecutable\"")" + fi + javaHome="$(dirname "\"$javaExecutable\"")" + javaHome=$(expr "$javaHome" : '\(.*\)/bin') + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=$(cd "$wdir/.." || exit 1; pwd) + fi + # end of workaround + done + printf '%s' "$(cd "$basedir" || exit 1; pwd)" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + # Remove \r in case we run on Windows within Git Bash + # and check out the repository with auto CRLF management + # enabled. Otherwise, we may read lines that are delimited with + # \r\n and produce $'-Xarg\r' rather than -Xarg due to word + # splitting rules. + tr -s '\r\n' ' ' < "$1" + fi +} + +log() { + if [ "$MVNW_VERBOSE" = true ]; then + printf '%s\n' "$1" + fi +} + +BASE_DIR=$(find_maven_basedir "$(dirname "$0")") +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR +log "$MAVEN_PROJECTBASEDIR" + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" +if [ -r "$wrapperJarPath" ]; then + log "Found $wrapperJarPath" +else + log "Couldn't find $wrapperJarPath, downloading it ..." + + if [ -n "$MVNW_REPOURL" ]; then + wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + else + wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + fi + while IFS="=" read -r key value; do + # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' ) + safeValue=$(echo "$value" | tr -d '\r') + case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;; + esac + done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" + log "Downloading from: $wrapperUrl" + + if $cygwin; then + wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath") + fi + + if command -v wget > /dev/null; then + log "Found wget ... using wget" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet" + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + else + wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + log "Found curl ... using curl" + [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent" + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" + else + curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" + fi + else + log "Falling back to using Java to download" + javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java" + javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaSource=$(cygpath --path --windows "$javaSource") + javaClass=$(cygpath --path --windows "$javaClass") + fi + if [ -e "$javaSource" ]; then + if [ ! -e "$javaClass" ]; then + log " - Compiling MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/javac" "$javaSource") + fi + if [ -e "$javaClass" ]; then + log " - Running MavenWrapperDownloader.java ..." + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath" + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +# If specified, validate the SHA-256 sum of the Maven wrapper jar file +wrapperSha256Sum="" +while IFS="=" read -r key value; do + case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;; + esac +done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" +if [ -n "$wrapperSha256Sum" ]; then + wrapperSha256Result=false + if command -v sha256sum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + elif command -v shasum > /dev/null; then + if echo "$wrapperSha256Sum $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then + wrapperSha256Result=true + fi + else + echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." + echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties." + exit 1 + fi + if [ $wrapperSha256Result = false ]; then + echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2 + echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2 + echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2 + exit 1 + fi +fi + +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --windows "$CLASSPATH") + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR") +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +# shellcheck disable=SC2086 # safe args +exec "$JAVACMD" \ + $MAVEN_OPTS \ + $MAVEN_DEBUG_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/mvnw.cmd b/kafka-avro/spring-boot-kafka-avro-consumer/mvnw.cmd new file mode 100644 index 00000000..f80fbad3 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/mvnw.cmd @@ -0,0 +1,205 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Apache Maven Wrapper startup batch script, version 3.2.0 +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %* +if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %* +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %WRAPPER_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file +SET WRAPPER_SHA_256_SUM="" +FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B +) +IF NOT %WRAPPER_SHA_256_SUM%=="" ( + powershell -Command "&{"^ + "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^ + "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^ + " Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^ + " Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^ + " Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^ + " exit 1;"^ + "}"^ + "}" + if ERRORLEVEL 1 goto error +) + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% ^ + %JVM_CONFIG_MAVEN_PROPS% ^ + %MAVEN_OPTS% ^ + %MAVEN_DEBUG_OPTS% ^ + -classpath %WRAPPER_JAR% ^ + "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^ + %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat" +if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%"=="on" pause + +if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE% + +cmd /C exit /B %ERROR_CODE% diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml new file mode 100644 index 00000000..44a6d2d2 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml @@ -0,0 +1,204 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.1.0-RC2 + + + com.example + spring-boot-kafka-avro-consumer + 0.0.1-SNAPSHOT + spring-boot-kafka-avro-consumer + Demo project for Spring Boot + + + 17 + 1.11.1 + 7.4.0 + 2.1.0 + + 1.18.1 + 2.36.0 + + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-data-jpa + + + org.springframework.kafka + spring-kafka + + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + ${springdoc-openapi.version} + + + + + + com.h2database + h2 + runtime + + + org.projectlombok + lombok + true + + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + kafka + test + + + org.awaitility + awaitility + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro/ + ${project.basedir}/generated-avro + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + generate-sources + + add-source + + + + generated-avro + + + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 1.17.0 + + + + + + + compile + + check + + + + + + + + + + confluent + https://packages.confluent.io/maven/ + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + + diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplication.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplication.java similarity index 63% rename from kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplication.java rename to kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplication.java index 8111ec73..3ef4a304 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplication.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplication.java @@ -4,9 +4,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication -public class SpringBootKafkaAvroApplication { +public class SpringBootKafkaAvroConsumerApplication { public static void main(String[] args) { - SpringApplication.run(SpringBootKafkaAvroApplication.class, args); + SpringApplication.run(SpringBootKafkaAvroConsumerApplication.class, args); } } diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/config/SwaggerConfig.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/config/SwaggerConfig.java new file mode 100644 index 00000000..d7841045 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/config/SwaggerConfig.java @@ -0,0 +1,13 @@ +package com.example.springbootkafkaavro.config; + +import io.swagger.v3.oas.annotations.OpenAPIDefinition; +import io.swagger.v3.oas.annotations.info.Info; +import io.swagger.v3.oas.annotations.servers.Server; + +import org.springframework.context.annotation.Configuration; + +@Configuration +@OpenAPIDefinition( + info = @Info(title = "spring-boot-kafka-avro", version = "v1"), + servers = @Server(url = "/")) +public class SwaggerConfig {} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java similarity index 100% rename from kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java rename to kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java similarity index 100% rename from kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java rename to kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/repository/PersonRepository.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/repository/PersonRepository.java similarity index 100% rename from kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/repository/PersonRepository.java rename to kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/repository/PersonRepository.java diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java new file mode 100644 index 00000000..85ca4bc7 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java @@ -0,0 +1,9 @@ +package com.example.springbootkafkaavro.util; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class ApplicationConstants { + + public static final String PERSONS_TOPIC = "persons"; +} diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/application.properties b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/application.properties new file mode 100644 index 00000000..2f1558e8 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/application.properties @@ -0,0 +1,12 @@ +server.port=8085 + +spring.kafka.consumer.bootstrap-servers=http://localhost:9092 +spring.kafka.consumer.group-id=group-1 +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer + +spring.kafka.properties.schema.registry.url=http://localhost:8081 +#Use Specific Record or else you get Avro GenericRecord. +spring.kafka.properties.specific.avro.reader=true + +spring.jpa.open-in-view=false \ No newline at end of file diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/avro/person.avsc b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/avro/person.avsc new file mode 100644 index 00000000..029c4c21 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/resources/avro/person.avsc @@ -0,0 +1,11 @@ +{ + "namespace": "com.example.springbootkafkaavro.model", + "type": "record", + "name": "Person", + "version": "1", + "fields": [ + { "name": "id", "type": "long" }, + { "name": "name", "type": "string", "avro.java.string": "String" }, + { "name": "age", "type": "int" } + ] +} \ No newline at end of file diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java new file mode 100644 index 00000000..9f27e937 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java @@ -0,0 +1,21 @@ +package com.example.springbootkafkaavro; + +import com.example.springbootkafkaavro.model.Person; +import com.example.springbootkafkaavro.util.ApplicationConstants; + +import lombok.RequiredArgsConstructor; + +import org.springframework.boot.test.context.TestComponent; +import org.springframework.kafka.core.KafkaTemplate; + +@TestComponent +@RequiredArgsConstructor +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(Person person) { + this.kafkaTemplate.send( + ApplicationConstants.PERSONS_TOPIC, person.getName().toString(), person); + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplicationTests.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java similarity index 87% rename from kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplicationTests.java rename to kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java index 57d6dea0..f0bbee57 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroApplicationTests.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java @@ -2,17 +2,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static java.util.concurrent.TimeUnit.SECONDS; +import com.example.springbootkafkaavro.model.Person; import com.example.springbootkafkaavro.repository.PersonRepository; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.web.servlet.MockMvc; @@ -24,12 +24,18 @@ import java.time.Duration; -@SpringBootTest +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer" + }) @AutoConfigureMockMvc -class SpringBootKafkaAvroApplicationTests { +@Import(KafkaProducer.class) +class SpringBootKafkaAvroConsumerApplicationTests { @Autowired MockMvc mockMvc; @Autowired PersonRepository personRepository; + @Autowired KafkaProducer kafkaProducer; private static final Network KAFKA_NETWORK = Network.newNetwork(); private static final String CONFLUENT_PLATFORM_VERSION = "7.4.0"; @@ -99,9 +105,10 @@ public String getSchemaUrl() { @Test void contextLoads() throws Exception { - this.mockMvc - .perform(post("/person/publish").param("name", "junit").param("age", "33")) - .andExpect(status().isOk()); + Person person = new Person(); + person.setAge(33); + person.setName("junit"); + this.kafkaProducer.sendMessage(person); await().atMost(10, SECONDS) .untilAsserted(() -> assertThat(personRepository.count()).isEqualTo(1)); } diff --git a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml index 0e394f54..ccf98c50 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml +++ b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml @@ -40,10 +40,6 @@ org.springframework.boot spring-boot-starter-validation - - org.springframework.boot - spring-boot-starter-data-jpa - org.springframework.kafka spring-kafka @@ -62,11 +58,6 @@ runtime true --> - - com.h2database - h2 - runtime - org.projectlombok lombok @@ -89,6 +80,11 @@ spring-boot-starter-test test + + org.springframework.boot + spring-boot-testcontainers + test + org.springframework.kafka spring-kafka-test diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplication.java b/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplication.java new file mode 100644 index 00000000..79c65f24 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/main/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplication.java @@ -0,0 +1,12 @@ +package com.example.springbootkafkaavro; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringBootKafkaAvroProducerApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBootKafkaAvroProducerApplication.class, args); + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/main/resources/application.properties b/kafka-avro/spring-boot-kafka-avro-producer/src/main/resources/application.properties index d97f92ca..db67e619 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/main/resources/application.properties +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/main/resources/application.properties @@ -3,11 +3,6 @@ spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Strin spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.producer.properties.schema.registry.url=http://localhost:8081 -spring.kafka.consumer.bootstrap-servers=http://localhost:9092 -spring.kafka.consumer.group-id=group-1 -spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer -spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer - spring.kafka.properties.schema.registry.url=http://localhost:8081 #Use Specific Record or else you get Avro GenericRecord. spring.kafka.properties.specific.avro.reader=true diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/AvroKafkaListener.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/AvroKafkaListener.java new file mode 100644 index 00000000..e2e01e72 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/AvroKafkaListener.java @@ -0,0 +1,21 @@ +package com.example.springbootkafkaavro; + +import com.example.springbootkafkaavro.model.Person; +import com.example.springbootkafkaavro.util.ApplicationConstants; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.boot.test.context.TestComponent; +import org.springframework.kafka.annotation.KafkaListener; + +@TestComponent +@Slf4j +public class AvroKafkaListener { + + @KafkaListener(topics = ApplicationConstants.PERSONS_TOPIC, groupId = "group_id") + public void handler(ConsumerRecord personConsumerRecord) { + Person person = personConsumerRecord.value(); + log.info("Person received : {} : {} ", person.getName(), person.getAge()); + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationTests.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationTests.java new file mode 100644 index 00000000..199759b3 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationTests.java @@ -0,0 +1,117 @@ +package com.example.springbootkafkaavro; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.web.servlet.MockMvc; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; + +@SpringBootTest( + properties = { + "spring.kafka.consumer.group-id=group-1", + "spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer", + "spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer" + }) +@AutoConfigureMockMvc +@Import(AvroKafkaListener.class) +@ExtendWith(OutputCaptureExtension.class) +class SpringBootKafkaAvroProducerApplicationTests { + + @Autowired MockMvc mockMvc; + + private static final Network KAFKA_NETWORK = Network.newNetwork(); + private static final String CONFLUENT_PLATFORM_VERSION = "7.4.0"; + private static final DockerImageName KAFKA_IMAGE = + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION); + private static final KafkaContainer KAFKA = + new KafkaContainer(KAFKA_IMAGE) + .withNetwork(KAFKA_NETWORK) + .withKraft() + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); + + private static final SchemaRegistryContainer SCHEMA_REGISTRY = + new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION) + .withStartupTimeout(Duration.ofMinutes(2)); + + static { + KAFKA.start(); + SCHEMA_REGISTRY.withKafka(KAFKA).start(); + // Should be set after container is started + SCHEMA_REGISTRY.withEnv("SCHEMA_REGISTRY_LISTENERS", SCHEMA_REGISTRY.getSchemaUrl()); + } + + @DynamicPropertySource + static void setProperties(DynamicPropertyRegistry registry) { + // Connect our Spring application to our Testcontainers Kafka instance + registry.add("spring.kafka.consumer.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add("spring.kafka.producer.bootstrap-servers", KAFKA::getBootstrapServers); + registry.add( + "spring.kafka.producer.properties.schema.registry.url", + SCHEMA_REGISTRY::getSchemaUrl); + registry.add("spring.kafka.properties.schema.registry.url", SCHEMA_REGISTRY::getSchemaUrl); + } + + private static class SchemaRegistryContainer extends GenericContainer { + public static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry"; + public static final int SCHEMA_REGISTRY_PORT = 8081; + + public SchemaRegistryContainer() { + this(CONFLUENT_PLATFORM_VERSION); + } + + public SchemaRegistryContainer(String version) { + super(DockerImageName.parse(SCHEMA_REGISTRY_IMAGE).withTag(CONFLUENT_PLATFORM_VERSION)); + + waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); + withExposedPorts(SCHEMA_REGISTRY_PORT); + } + + public SchemaRegistryContainer withKafka(KafkaContainer kafka) { + return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092"); + } + + public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) { + withNetwork(network); + withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry"); + withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + "PLAINTEXT://" + bootstrapServers); + return self(); + } + + public String getSchemaUrl() { + return String.format("http://%s:%d", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT)); + } + } + + @Test + void contextLoads(CapturedOutput output) throws Exception { + this.mockMvc + .perform(post("/person/publish").param("name", "junit").param("age", "33")) + .andExpect(status().isOk()); + await().atMost(30, SECONDS) + .untilAsserted( + () -> assertThat(output.getOut()).contains("Person received : junit : 33")); + } +}