Skip to content

Commit

Permalink
Compatible with Kafka 2.3.0 (#15)
Browse files Browse the repository at this point in the history
* Don't exclude Apache HTTP Components JAR, bump Connect API and AWS SDK versions, add docker-compose file and instructures for demo

* Fix typo

Co-authored-by: Weyant <[email protected]>
  • Loading branch information
adamweyant and Weyant authored Dec 28, 2020
1 parent bbb3a1e commit e3140e5
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 47 deletions.
54 changes: 22 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# kafka-connect-sqs
The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).

## Supported Kafka and AWS versions
The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.501`
## Compatibility Matrix
|Connector version|Kafka Connect API|AWS SDK|
|:---|:---|:---|
|1.1.0|2.2.0|1.11.501|
|1.2.0|2.3.0|1.11.924|

Due to a compatibility issue with [Apache httpcomponents](http://hc.apache.org/), connector versions 1.1.0 and earlier may not work with Kafka Connect versions greater than 2.2

# Building
You can build the connector with Maven using the standard lifecycle goals:
Expand Down Expand Up @@ -165,11 +170,21 @@ For a `source` connector, the minimum actions required are:

# Running the Demo

The demo uses the Confluent Platform which can be downloaded here: https://www.confluent.io/download/
## Build the connector plugin

Build the connector jar file:

```shell
mvn clean package
```

You can use either the Enterprise or Community version.
## Run the connector using Docker Compose

The rest of the tutorial assumes the Confluent Platform is installed at $CP and $CP/bin is on your PATH.
Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Docker Compose will pass these values into the `connect` container.

Use the provided [Docker Compose](https://docs.docker.com/compose) file and run `docker-compose up`.

With the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html), verify the SQS sink and source connectors are installed and ready: `curl http://localhost:8083/connector-plugins`.

## AWS

Expand Down Expand Up @@ -201,24 +216,6 @@ topic and write to a _different_ SQS queue.
Create `chirps-q` and `chirped-q` SQS queues using the AWS Console. Take note of the `URL:` values for each
as you will need them to configure the connectors later.

## Build the connector plugin

Build the connector jar file and copy to the the classpath of Kafka Connect:

```shell
mvn clean package
mkdir $CP/share/java/kafka-connect-sqs
cp target/kafka-connect-sqs-0.0.1.jar $CP/share/java/kafka-connect-sqs/
```

## Start Confluent Platform using the Confluent CLI

See https://docs.confluent.io/current/quickstart/ce-quickstart.html#ce-quickstart for more details.

```shell
$CP/bin/confluent start
```

## Create the connectors

The `source` connector configuration is defined in `demos/sqs-source-chirps.json]`, The `sink` connector configuration
Expand All @@ -228,8 +225,8 @@ values noted when you created the queues.
Create the connectors using the Confluent CLI:

```shell
confluent load kafka-connect-sqs -d ./demos/sqs-source-chirps.json
confluent load kafka-connect-sqs -d ./demos/sqs-sink-chirped.json
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-source-chirps.json
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-sink-chirped.json
```

## Send/receive messages
Expand All @@ -242,10 +239,3 @@ The `sink` connector will read the message from the topic and write it to the `c

Use the AWS Console (or the AWS CLI) to read your message from the `chirped-q`

## Cleaning up

Clean up by destroying the Confluent Platform to remove all messages and data stored on disk:

```shell
confluent destroy
```
14 changes: 14 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

# default log levels
log4j.logger.org.reflections=ERROR
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR

log4j.logger.com.amazonaws=WARN
#log4j.logger.com.amazonaws.request=DEBUG
#log4j.logger.org.apache.http.wire=DEBUG
80 changes: 80 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
logging: { driver: none }

broker:
image: confluentinc/cp-kafka:6.0.1
ports:
- 9092:9092
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:19092
- KAFKA_ADVERTISED_LISTENERS=PUBLIC://localhost:9092,INTERNAL://broker:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_NUM_PARTITIONS=2
- KAFKA_DEFAULT_REPLICATION_FACTOR=1
- KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=10
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_DELETE_TOPIC_ENABLE=true
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false

- KAFKA_LOG4J_ROOT_LOGLEVEL=INFO
depends_on: [zookeeper]
logging: { driver: none }

schema-registry:
image: confluentinc/cp-schema-registry:6.0.1
hostname: schema-registry
ports:
- 8080:8080
environment:
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8080

- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://broker:19092
- SCHEMA_REGISTRY_KAFKASTORE_TOPIC=_schemas
- SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR=1
- SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=15000
depends_on: [broker]
logging: { driver: none }

# NB: run connect locally in stand-alone mode to debug
connect:
image: confluentinc/cp-kafka-connect:6.0.1
ports:
- 8083:8083
environment:
- CONNECT_BOOTSTRAP_SERVERS=broker:19092
- CONNECT_REST_ADVERTISED_HOST_NAME=connect
- CONNECT_GROUP_ID=connect
- CONNECT_CONFIG_STORAGE_TOPIC=_connect_configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- CONNECT_OFFSET_STORAGE_TOPIC=_connect_offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- CONNECT_STATUS_STORAGE_TOPIC=_connect_status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

- CONNECT_PLUGIN_PATH=/opt/connectors
- KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties

- AWS_PROFILE
- AWS_REGION
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
volumes:
- ~/.aws:/root/.aws
- ./target/plugin:/opt/connectors
- ./config/log4j.properties:/etc/log4j.properties
depends_on: [broker]
21 changes: 6 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<groupId>com.nordstrom.kafka.connect.sqs</groupId>
<artifactId>kafka-connect-sqs</artifactId>
<name>Kafka Connect SQS Sink/Source Connector</name>
<version>1.1.0</version>
<version>1.2.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -41,9 +41,9 @@
<maven-surefire-plugin.version>2.22.1</maven-surefire-plugin.version>
<maven-project-info-reports-plugin.version>3.0.0</maven-project-info-reports-plugin.version>

<aws-java-sdk.version>1.11.501</aws-java-sdk.version>
<aws-java-sdk.version>1.11.924</aws-java-sdk.version>

<kafka.connect-api.version>2.1.0</kafka.connect-api.version>
<kafka.connect-api.version>2.3.0</kafka.connect-api.version>
</properties>

<dependencyManagement>
Expand All @@ -61,18 +61,10 @@
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>${aws-java-sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -177,7 +169,6 @@
<!-- provided by kafka-connect -->
<exclude>commons-codec:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>org.apache.kafka:*</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
Expand Down

0 comments on commit e3140e5

Please sign in to comment.