Skip to content

Commit

Permalink
Merge pull request #19 from Nordstrom/configurable_payload_formatter
Browse files Browse the repository at this point in the history
Can specify a payload formatter
  • Loading branch information
dylanmei authored Aug 13, 2019
2 parents 1f773cf + a11685e commit 0c9caca
Show file tree
Hide file tree
Showing 19 changed files with 837 additions and 731 deletions.
2 changes: 1 addition & 1 deletion config/worker.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
bootstrap.servers=localhost:9092

plugin.path=target
plugin.path=target/plugin
offset.storage.file.filename=/tmp/connect.offsets

key.converter=org.apache.kafka.connect.storage.StringConverter
Expand Down
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ services:
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

- CONNECT_PLUGIN_PATH=/opt/connectors
- KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties

- AWS_PROFILE
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
volumes:
- ~/.aws:/root/.aws
- ./target:/opt/connectors
- /opt/connectors/.shaded-jar
- ./target/plugin:/opt/connectors
- ./config/log4j.properties:/etc/log4j.properties
depends_on: [broker]
189 changes: 63 additions & 126 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,11 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>

<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.2</jacoco-maven-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
<maven-javadoc-plugin.version>3.0.1</maven-javadoc-plugin.version>
<maven-shade-plugin.version>3.1.1</maven-shade-plugin.version>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
<maven-surefire-plugin.version>2.22.1</maven-surefire-plugin.version>
<maven-project-info-reports-plugin.version>3.0.0</maven-project-info-reports-plugin.version>

<kafka.connect-api.version>2.1.0</kafka.connect-api.version>
<aws-java-sdk.version>1.11.592</aws-java-sdk.version>
<junit.version>4.12</junit.version>
<mockito-core.version>2.28.2</mockito-core.version>
<google.guava.version>19.0</google.guava.version>
<lombok.version>1.18.4</lombok.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -59,11 +48,13 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${google.guava.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand All @@ -79,138 +70,84 @@
</resource>
</resources>

<!-- PluginManagement defines configurations for plugins, but DOES
NOT force execution -->
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/.shaded-jar</outputDirectory>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- exclude manifest signatures to prevent 'Invalid
signature file digest for Manifest main attributes' -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<!-- exclude test frameworks -->
<exclude>junit:*</exclude>
<exclude>jmock:*</exclude>
<exclude>mockito-all:*</exclude>

<!-- provided by kafka-connect -->
<exclude>commons-codec:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>org.apache.kafka:*</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>prepare-package</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${exec-maven-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.0.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${maven-failsafe-plugin.version}</version>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<outputDirectory>target/plugin/</outputDirectory>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- exclude manifest signatures to prevent 'Invalid
signature file digest for Manifest main attributes' -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<!-- exclude test frameworks -->
<exclude>junit:*</exclude>
<exclude>jmock:*</exclude>
<exclude>mockito-all:*</exclude>

<!-- provided by kafka-connect -->
<exclude>commons-codec:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>org.apache.kafka:*</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.nordstrom.kafka.connect.formatters;

import java.util.Collection;
import org.apache.kafka.connect.sink.SinkRecord;

public interface PayloadFormatter {
String format(final SinkRecord record) throws PayloadFormattingException;
String formatBatch(final Collection<SinkRecord> records) throws PayloadFormattingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.nordstrom.kafka.connect.formatters;

public class PayloadFormattingException extends RuntimeException {
public PayloadFormattingException (final Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.nordstrom.kafka.connect.formatters;

import org.apache.kafka.connect.sink.SinkRecord;

public class PlainPayload {
private String key;
private String keySchemaName;
private String value;
private String valueSchemaName;
private String topic;
private int partition;
private long offset;
private long timestamp;
private String timestampTypeName;

protected PlainPayload() {
}

public PlainPayload(final SinkRecord record) {
this.key = record.key() == null ? "" : record.key().toString();
if (record.keySchema() != null)
this.keySchemaName = record.keySchema().name();

this.value = record.value() == null ? "" : record.value().toString();
if (record.valueSchema() != null)
this.valueSchemaName = record.valueSchema().name();

this.topic = record.topic();
this.partition = record.kafkaPartition();
this.offset = record.kafkaOffset();

if (record.timestamp() != null)
this.timestamp = record.timestamp();
if (record.timestampType() != null)
this.timestampTypeName = record.timestampType().name;
}

public String getValue() { return this.value; }
public void setValue(final String value) { this.value = value; }

public long getOffset() { return this.offset; }
public void setOffset(final long offset) { this.offset = offset; }

public Long getTimestamp() { return this.timestamp; }
public void setTimestamp(final long timestamp) { this.timestamp = timestamp; }

public String getTimestampTypeName() { return this.timestampTypeName; }
public void setTimestampTypeName(final String timestampTypeName) { this.timestampTypeName = timestampTypeName; }

public int getPartition() { return this.partition; }
public void setPartition(final int partition) { this.partition = partition; }

public String getKey() { return this.key; }
public void setKey(final String key) { this.key = key; }

public String getKeySchemaName() { return this.keySchemaName; }
public void setKeySchemaName(final String keySchemaName) { this.keySchemaName = keySchemaName; }

public String getValueSchemaName() { return this.valueSchemaName; }
public void setValueSchemaName(final String valueSchemaName) { this.valueSchemaName = valueSchemaName; }

public String getTopic() { return this.topic; }
public void setTopic(final String topic) { this.topic = topic; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.nordstrom.kafka.connect.formatters;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlainPayloadFormatter implements PayloadFormatter {
private static final Logger LOGGER = LoggerFactory.getLogger(PlainPayloadFormatter.class);
private final ObjectWriter recordWriter = new ObjectMapper().writerFor(PlainPayload.class);
private final ObjectWriter recordsWriter = new ObjectMapper().writerFor(PlainPayload[].class);

public String format(final SinkRecord record) {
PlainPayload payload = new PlainPayload(record);

try {
return this.recordWriter.writeValueAsString(payload);
} catch (final JsonProcessingException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new PayloadFormattingException(e);
}
}

public String formatBatch(final Collection<SinkRecord> records) {
final PlainPayload[] payloads = records
.stream()
.map(record -> new PlainPayload(record))
.toArray(PlainPayload[]::new);

try {
return this.recordsWriter.writeValueAsString(payloads);
} catch (final JsonProcessingException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new PayloadFormattingException(e);
}
}
}
Loading

0 comments on commit 0c9caca

Please sign in to comment.