diff --git a/README.md b/README.md index 4894bc1..8e55884 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,13 @@ # kafka-connect-sqs The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue). -## Supported Kafka and AWS versions -The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.501` +## Compatibility Matrix +|Connector version|Kafka Connect API|AWS SDK| +|:---|:---|:---| +|1.1.0|2.2.0|1.11.501| +|1.2.0|2.3.0|1.11.924| + +Due to a compatibility issue with [Apache httpcomponents](http://hc.apache.org/), connector versions 1.1.0 and earlier may not work with Kafka Connect versions greater than 2.2 # Building You can build the connector with Maven using the standard lifecycle goals: @@ -165,11 +170,21 @@ For a `source` connector, the minimum actions required are: # Running the Demo -The demo uses the Confluent Platform which can be downloaded here: https://www.confluent.io/download/ +## Build the connector plugin + +Build the connector jar file: + +```shell +mvn clean package +``` -You can use either the Enterprise or Community version. +## Run the connector using Docker Compose -The rest of the tutorial assumes the Confluent Platform is installed at $CP and $CP/bin is on your PATH. +Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Docker Compose will pass these values into the `connect` container. + +Use the provided [Docker Compose](https://docs.docker.com/compose) file and run `docker-compose up`. + +With the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html), verify the SQS sink and source connectors are installed and ready: `curl http://localhost:8083/connector-plugins`. ## AWS @@ -201,24 +216,6 @@ topic and write to a _different_ SQS queue. Create `chirps-q` and `chirped-q` SQS queues using the AWS Console. Take note of the `URL:` values for each as you will need them to configure the connectors later. -## Build the connector plugin - -Build the connector jar file and copy to the the classpath of Kafka Connect: - -```shell -mvn clean package -mkdir $CP/share/java/kafka-connect-sqs -cp target/kafka-connect-sqs-0.0.1.jar $CP/share/java/kafka-connect-sqs/ -``` - -## Start Confluent Platform using the Confluent CLI - -See https://docs.confluent.io/current/quickstart/ce-quickstart.html#ce-quickstart for more details. - -```shell -$CP/bin/confluent start -``` - ## Create the connectors The `source` connector configuration is defined in `demos/sqs-source-chirps.json]`, The `sink` connector configuration @@ -228,8 +225,8 @@ values noted when you created the queues. Create the connectors using the Confluent CLI: ```shell -confluent load kafka-connect-sqs -d ./demos/sqs-source-chirps.json -confluent load kafka-connect-sqs -d ./demos/sqs-sink-chirped.json +curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-source-chirps.json +curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-sink-chirped.json ``` ## Send/receive messages @@ -242,10 +239,3 @@ The `sink` connector will read the message from the topic and write it to the `c Use the AWS Console (or the AWS CLI) to read your message from the `chirped-q` -## Cleaning up - -Clean up by destroying the Confluent Platform to remove all messages and data stored on disk: - -```shell -confluent destroy -``` diff --git a/config/log4j.properties b/config/log4j.properties new file mode 100644 index 0000000..6f9e9c1 --- /dev/null +++ b/config/log4j.properties @@ -0,0 +1,14 @@ +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +# default log levels +log4j.logger.org.reflections=ERROR +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.I0Itec.zkclient=ERROR + +log4j.logger.com.amazonaws=WARN +#log4j.logger.com.amazonaws.request=DEBUG +#log4j.logger.org.apache.http.wire=DEBUG \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1fc44a7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,80 @@ +version: "3" + +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.5.2 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - 2181:2181 + logging: { driver: none } + + broker: + image: confluentinc/cp-kafka:6.0.1 + ports: + - 9092:9092 + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_LISTENERS=PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:19092 + - KAFKA_ADVERTISED_LISTENERS=PUBLIC://localhost:9092,INTERNAL://broker:19092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_NUM_PARTITIONS=2 + - KAFKA_DEFAULT_REPLICATION_FACTOR=1 + - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=10 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_DELETE_TOPIC_ENABLE=true + - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + + - KAFKA_LOG4J_ROOT_LOGLEVEL=INFO + depends_on: [zookeeper] + logging: { driver: none } + + schema-registry: + image: confluentinc/cp-schema-registry:6.0.1 + hostname: schema-registry + ports: + - 8080:8080 + environment: + - SCHEMA_REGISTRY_HOST_NAME=schema-registry + - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8080 + + - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://broker:19092 + - SCHEMA_REGISTRY_KAFKASTORE_TOPIC=_schemas + - SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR=1 + - SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=15000 + depends_on: [broker] + logging: { driver: none } + + # NB: run connect locally in stand-alone mode to debug + connect: + image: confluentinc/cp-kafka-connect:6.0.1 + ports: + - 8083:8083 + environment: + - CONNECT_BOOTSTRAP_SERVERS=broker:19092 + - CONNECT_REST_ADVERTISED_HOST_NAME=connect + - CONNECT_GROUP_ID=connect + - CONNECT_CONFIG_STORAGE_TOPIC=_connect_configs + - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 + - CONNECT_OFFSET_STORAGE_TOPIC=_connect_offsets + - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 + - CONNECT_STATUS_STORAGE_TOPIC=_connect_status + - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 + - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter + - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter + - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter + - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter + + - CONNECT_PLUGIN_PATH=/opt/connectors + - KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties + + - AWS_PROFILE + - AWS_REGION + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + volumes: + - ~/.aws:/root/.aws + - ./target/plugin:/opt/connectors + - ./config/log4j.properties:/etc/log4j.properties + depends_on: [broker] \ No newline at end of file diff --git a/pom.xml b/pom.xml index 47934c4..85ac659 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.nordstrom.kafka.connect.sqs kafka-connect-sqs Kafka Connect SQS Sink/Source Connector - 1.1.0 + 1.2.0 UTF-8 @@ -41,9 +41,9 @@ 2.22.1 3.0.0 - 1.11.501 + 1.11.924 - 2.1.0 + 2.3.0 @@ -61,18 +61,10 @@ com.amazonaws - aws-java-sdk-core - ${aws-java-sdk.version} - - - com.amazonaws - aws-java-sdk-sqs - ${aws-java-sdk.version} - - - com.amazonaws - aws-java-sdk-sts + aws-java-sdk-bom ${aws-java-sdk.version} + pom + import @@ -177,7 +169,6 @@ commons-codec:* commons-logging:* - org.apache.httpcomponents:* org.apache.kafka:* org.slf4j:*