diff --git a/.gitignore b/.gitignore index 48a7487..c533f37 100644 --- a/.gitignore +++ b/.gitignore @@ -19,10 +19,6 @@ *.tar.gz *.rar -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* -/target/ - # Eclipse .classpath .project @@ -33,4 +29,9 @@ dependency-reduced-pom.xml # IntelliJ .idea -*.iml \ No newline at end of file +*.iml + +# Files relevant to this project +target/ +config/sink-connector.properties +config/source-connector.properties diff --git a/README.md b/README.md index ca403cc..09076f6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # kafka-connect-sqs -The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue). +The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS +queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue). ## Compatibility matrix @@ -10,12 +11,11 @@ The SQS connector plugin provides the ability to use AWS SQS queues as both a so |1.5|3.3.2|1.12.409| |1.6|3.4.1|1.12.669| -Running the connector on versions of Kafka Connect prior to 3.0 is not recommended. - ## Building the distributable You can build the connector with Maven using the standard lifecycle goals: -``` + +```sh mvn clean mvn package ``` @@ -37,27 +37,10 @@ Optional properties: * `sqs.message.attributes.include.list`: The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string. * `sqs.message.attributes.partition.key`: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key. -### Sample connector configuration - -```json -{ - "config": { - "connector.class": "com.nordstrom.kafka.connect.sqs.SqsSourceConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "name": "my-sqs-source", - "sqs.max.messages": "5", - "sqs.queue.url": "https://sqs..amazonaws.com//my-queue", - "sqs.wait.time.seconds": "5", - "topics": "my-topic", - "value.converter": "org.apache.kafka.connect.storage.StringConverter" - }, - "name": "my-sqs-source" -} -``` - ### Sample IAM policy -Ensure the authentication principal has privileges to read messages from the SQS queue. +When using this connector, ensure the authentication principal has privileges to read messages from +the SQS queue. ```json { @@ -90,23 +73,6 @@ Optional properties: * `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false. * `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string. -### Sample connector configuration - -```json -{ - "config": { - "connector.class": "com.nordstrom.kafka.connect.sqs.SqsSinkConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "name": "my-sqs-sink", - "sqs.queue.url": "https://sqs..amazonaws.com//my-queue", - "sqs.region": "", - "topics": "my-topic", - "value.converter": "org.apache.kafka.connect.storage.StringConverter" - }, - "name": "my-sqs-sink" -} -``` - ### Sample SQS queue policy Define a corresponding SQS queue policy that allows the connector to send messages to the SQS queue: @@ -131,7 +97,8 @@ Define a corresponding SQS queue policy that allows the connector to send messag ### Sample IAM policy -Ensure the authentication principal has privileges to send messages to the SQS queue. +When using this connector, ensure the authentication principal has privileges to read messages from +the SQS queue. ```json { @@ -205,73 +172,27 @@ The IAM role will have a corresponding trust policy. For example: } ``` -## Running the demo - -### Build the connector plugin +## Running the connector -Build the connector jar file: - -```shell -mvn clean package -``` +This example demonstrates using the sink connector to send a message to an SQS queue from Kafka. -### Run the connector using Docker Compose +- Setup an SQS queue +- Setup Kafka. Use the cluster defined in `docker-compose.yaml` if you don't have one +- Customize the files in the config directory; for example, `config/sink-connector.properties.example` -Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Docker Compose will pass these values into the `connect` container. - -Use the provided [Docker Compose](https://docs.docker.com/compose) file and run `docker-compose up`. - -With the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html), verify the SQS sink and source connectors are installed and ready: `curl http://localhost:8083/connector-plugins`. - -### AWS - -The demo assumes you have an AWS account and valid credentials in ~/.aws/credentials as well as -setting the `AWS_PROFILE` and `AWS_REGION` to appropriate values. - -These are required so that Kafka Connect will have access to the SQS queues. - -### The flow - -We will use the AWS Console to put a message into an SQS queue. A source connector will read messages -from the queue and write the messages to a Kafka topic. A sink connector will read messages from the -topic and write to a _different_ SQS queue. +Now, start the sink connector in standalone mode: +```sh +$KAFKA_HOME/bin/connect-standalone.sh \ + config/connect-worker.properties config/sink-connector.properties ``` - __ | s | | k | | s | -( o> chirp ---> | q | ---> | a | ---> | q | -///\ | s | | | f | | | s | -\V_/_ |_____| | |_____| | |_____| - chirps-q | chirps-t | chirped-q - | | - | | - source- sink- - connector connector -``` - -### Create AWS SQS queues -Create `chirps-q` and `chirped-q` SQS queues using the AWS Console. Take note of the `URL:` values for each -as you will need them to configure the connectors later. +Use a tool to produce messages to the Kafka topic. -### Create the connectors - -The `source` connector configuration is defined in `demos/sqs-source-chirps.json]`, The `sink` connector configuration -is defined in `demos/sqs-sink-chirped.json`. You will have to modify the `sqs.queue.url` parameter to reflect the -values noted when you created the queues. - -Create the connectors using the Confluent CLI: - -```shell -curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-source-chirps.json -curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @demos/sqs-sink-chirped.json +```sh +bin/kafka-console-producer --bootstrap-server localhost:9092 \ + --topic hello-sqs-sink \ + --property parse.headers=true \ + --property 'headers.delimiter=\t' +>test:abc\t{"hello":"world"} ``` - -### Send and receive messages - -Using the AWS Console (or the AWS CLI), send a message to the `chirps-q`. - -The source connector will read the message from the queue and write it to the `chirps-t` Kafka topic. - -The `sink` connector will read the message from the topic and write it to the `chirped-q` queue. - -Use the AWS Console (or the AWS CLI) to read your message from the `chirped-q` diff --git a/config/connect-worker.properties b/config/connect-worker.properties new file mode 100644 index 0000000..f0a3742 --- /dev/null +++ b/config/connect-worker.properties @@ -0,0 +1,10 @@ +bootstrap.servers=localhost:9092 + +plugin.path=./target/plugin/ +offset.storage.file.filename=/tmp/connect.offsets + +key.converter=org.apache.kafka.connect.json.JsonConverter +value.converter=org.apache.kafka.connect.json.JsonConverter + +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter diff --git a/config/sink-connector.properties.example b/config/sink-connector.properties.example new file mode 100644 index 0000000..5b61d67 --- /dev/null +++ b/config/sink-connector.properties.example @@ -0,0 +1,8 @@ +name: sqs-sink +connector.class: com.nordstrom.kafka.connect.sqs.SqsSinkConnector +key.converter: org.apache.kafka.connect.storage.StringConverter +value.converter: org.apache.kafka.connect.storage.StringConverter + +topics: hello-sqs-sink +sqs.queue.url: https://sqs.us-west-2.amazonaws.com//hello-sqs-sink +sqs.message.attributes.enabled: true diff --git a/config/source-connector.properties.example b/config/source-connector.properties.example new file mode 100644 index 0000000..b2f7f48 --- /dev/null +++ b/config/source-connector.properties.example @@ -0,0 +1,9 @@ +name: sqs-source +connector.class: com.nordstrom.kafka.connect.sqs.SqsSourceConnector +key.converter: org.apache.kafka.connect.storage.StringConverter +value.converter: org.apache.kafka.connect.storage.StringConverter + +topics: hello-sqs-source +sqs.queue.url: https://sqs.us-west-2.amazonaws.com//hello-sqs-source +sqs.wait.time.seconds: 5 +sqs.message.attributes.enabled: true diff --git a/demos/sqs-sink-chirped.json b/demos/sqs-sink-chirped.json deleted file mode 100644 index 8140e7d..0000000 --- a/demos/sqs-sink-chirped.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "config": { - "connector.class": "com.nordstrom.kafka.connect.sqs.SqsSinkConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "name": "sqs-sink-chirped", - "sqs.queue.url": "https://sqs.us-west-2.amazonaws.com/CHANGEME/chirped-q", - "topics": "chirps-t", - "value.converter": "org.apache.kafka.connect.storage.StringConverter" - }, - "name": "sqs-sink-chirped" -} diff --git a/demos/sqs-source-chirps.json b/demos/sqs-source-chirps.json deleted file mode 100644 index 06dda40..0000000 --- a/demos/sqs-source-chirps.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "config": { - "connector.class": "com.nordstrom.kafka.connect.sqs.SqsSourceConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "name": "sqs-source-chirps", - "sqs.max.messages": "5", - "sqs.queue.url": "https://sqs.us-west-2.amazonaws.com/CHANGEME/chirps-q", - "sqs.wait.time.seconds": "5", - "topics": "chirps-t", - "value.converter": "org.apache.kafka.connect.storage.StringConverter" - }, - "name": "sqs-source-chirps" -} diff --git a/docker-compose.yml b/docker-compose.yml index 8073e9a..c4dad30 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,64 +2,31 @@ version: "3" services: zookeeper: - image: confluentinc/cp-zookeeper:7.1.1 + image: confluentinc/cp-zookeeper:7.4.5 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 - logging: { driver: none } + logging: + driver: none - broker: - image: confluentinc/cp-kafka:7.1.1 + kafka-broker: + image: confluentinc/cp-kafka:7.4.5 ports: - 9092:9092 + - 9093:9093 environment: - - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_LISTENERS=PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:19092 - - KAFKA_ADVERTISED_LISTENERS=PUBLIC://localhost:9092,INTERNAL://broker:19092 - - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT - - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL - - KAFKA_NUM_PARTITIONS=2 - - KAFKA_DEFAULT_REPLICATION_FACTOR=1 - - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=10 - - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - - KAFKA_DELETE_TOPIC_ENABLE=true - - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false - - - KAFKA_LOG4J_ROOT_LOGLEVEL=INFO + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PUBLIC://0.0.0.0:9092,INTERNAL://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PUBLIC://localhost:9092,INTERNAL://kafka-broker:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PUBLIC:PLAINTEXT,INTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_NUM_PARTITIONS: 2 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 10 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_DELETE_TOPIC_ENABLE: "true" + KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false" + KAFKA_LOG4J_LOGGERS: kafka=WARN,kafka.controller=INFO,kafka.server.KafkaServer=INFO,org.apache.zookeeper=WARN depends_on: [zookeeper] - logging: { driver: none } - - # NB: run connect locally in stand-alone mode to debug - connect: - image: confluentinc/cp-kafka-connect:7.1.1 - ports: - - 8083:8083 - environment: - - CONNECT_BOOTSTRAP_SERVERS=broker:19092 - - CONNECT_REST_ADVERTISED_HOST_NAME=connect - - CONNECT_REST_PORT=8083 - - CONNECT_GROUP_ID=connect - - CONNECT_CONFIG_STORAGE_TOPIC=_connect_configs - - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_OFFSET_STORAGE_TOPIC=_connect_offsets - - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_STATUS_STORAGE_TOPIC=_connect_status - - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - - - CONNECT_PLUGIN_PATH=/opt/connectors - - KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties - - - AWS_PROFILE - - AWS_REGION - - AWS_ACCESS_KEY_ID - - AWS_SECRET_ACCESS_KEY - volumes: - - ~/.aws:/root/.aws - - ./target/plugin:/opt/connectors - - ./config/log4j.properties:/etc/log4j.properties - depends_on: [broker] diff --git a/pom.xml b/pom.xml index d616474..7cdceae 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.nordstrom.kafka.connect.sqs kafka-connect-sqs Kafka Connect SQS Sink/Source Connector - 1.6.1 + 1.6.2 UTF-8 diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java index 751f1c0..9daff52 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java @@ -37,11 +37,6 @@ public class SqsClient { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final String AWS_FIFO_SUFFIX = ".fifo"; - public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = - com.amazonaws.auth.DefaultAWSCredentialsProviderChain.class; - - private final Boolean messageAttributesEnabled; - private final List messageAttributesList; private final AmazonSQS client; @@ -64,22 +59,7 @@ public SqsClient(SqsConnectorConfig config) { } builder.setCredentials(provider); - -// // If there's an AWS credentials profile and/or region configured in the -// // environment we will use it. -// final String profile = System.getenv(AWS_PROFILE); -// final String region = System.getenv(AWS_REGION); -// if (Facility.isNotNullNorEmpty(profile)) { -// builder.setCredentials(provider); -// } -// if (Facility.isNotNullNorEmpty(region)) { -// builder.setRegion(region); -// } -// log.info("AmazonSQS using profile={}, region={}", profile, region); - client = builder.build(); - messageAttributesEnabled = config.getMessageAttributesEnabled(); - messageAttributesList = config.getMessageAttributesList(); } /** @@ -104,9 +84,11 @@ public void delete(final String url, final String receiptHandle) { * @param url SQS queue url. * @param maxMessages Maximum number of messages to receive for this call. * @param waitTimeSeconds Time to wait, in seconds, for messages to arrive. + * @param messageAttributesEnabled Whether to collect message attributes. + * @param messageAttributesList Which message attributes to collect; if empty, all attributes are collected. * @return Collection of messages received. */ - public List receive(final String url, final int maxMessages, final int waitTimeSeconds) { + public List receive(final String url, final int maxMessages, final int waitTimeSeconds, final Boolean messageAttributesEnabled, final List messageAttributesList) { log.debug(".receive:queue={}, max={}, wait={}", url, maxMessages, waitTimeSeconds); Guard.verifyValidUrl(url); @@ -123,7 +105,7 @@ public List receive(final String url, final int maxMessages, final int .withMaxNumberOfMessages(maxMessages).withWaitTimeSeconds(waitTimeSeconds).withAttributeNames(""); if (messageAttributesEnabled) { - if (messageAttributesList.size() == 0) { + if (messageAttributesList.isEmpty()) { receiveMessageRequest = receiveMessageRequest.withMessageAttributeNames("All"); } else { receiveMessageRequest = receiveMessageRequest.withMessageAttributeNames(messageAttributesList); @@ -147,7 +129,7 @@ public List receive(final String url, final int maxMessages, final int * @param groupId Optional group identifier (fifo queues only). * @param messageId Optional message identifier (fifo queues only). * @param messageAttributes The message attributes to send. - * @return + * @return Sequence number when FIFO; otherwise, the message identifier */ public String send(final String url, final String body, final String groupId, final String messageId, final Map messageAttributes) { log.debug(".send: queue={}, gid={}, mid={}", url, groupId, messageId); diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java index e6124e3..56b1e81 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java @@ -1,36 +1,27 @@ package com.nordstrom.kafka.connect.sqs; +import com.amazonaws.auth.AWSCredentialsProvider; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -abstract class SqsConnectorConfig extends AbstractConfig { - +abstract public class SqsConnectorConfig extends AbstractConfig { private final String queueUrl; private final String topics; private final String region; private final String endpointUrl; - private final Boolean messageAttributesEnabled; - - private final List messageAttributesList; - - private final String messageAttributePartitionKey; - public SqsConnectorConfig(ConfigDef configDef, Map originals) { super(configDef, originals); queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue()); topics = getString(SqsConnectorConfigKeys.TOPICS.getValue()); region = getString(SqsConnectorConfigKeys.SQS_REGION.getValue()); endpointUrl = getString(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue()); - messageAttributesEnabled = getBoolean(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue()); - - List csMessageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue()); - messageAttributesList = messageAttributesEnabled ? csMessageAttributesList : new ArrayList<>(); - messageAttributePartitionKey = getString(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue()); } public String getQueueUrl() { @@ -49,15 +40,23 @@ public String getEndpointUrl() { return endpointUrl; } - public Boolean getMessageAttributesEnabled() { - return messageAttributesEnabled; - } - - public List getMessageAttributesList() { - return messageAttributesList; - } - - public String getMessageAttributePartitionKey() { - return messageAttributePartitionKey; + protected static class CredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider instanceof Class + && AWSCredentialsProvider.class.isAssignableFrom((Class) provider)) { + return; + } + throw new ConfigException( + name, + provider, + "Class must extend: " + AWSCredentialsProvider.class + ); + } + + @Override + public String toString() { + return "Any class implementing: " + AWSCredentialsProvider.class; + } } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java index b0fb61e..4bacf5d 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java @@ -16,6 +16,8 @@ package com.nordstrom.kafka.connect.sqs; +import java.util.Collections; +import java.util.List; import java.util.Map; import com.amazonaws.auth.AWSCredentialsProvider; @@ -27,11 +29,16 @@ import org.slf4j.LoggerFactory; public class SqsSinkConnectorConfig extends SqsConnectorConfig { - private static final Logger log = LoggerFactory.getLogger(SqsSinkConnectorConfig.class); + private final Boolean messageAttributesEnabled; + private final List messageAttributesList; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), Type.STRING, Importance.HIGH, "URL of the SQS queue to be written to.") .define(SqsConnectorConfigKeys.TOPICS.getValue(), Type.STRING, Importance.HIGH, "Kafka topic to be read from.") + .define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH, + "SQS queue AWS region.") + .define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, "", Importance.LOW, + "If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.") .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), new CredentialsProviderValidator(), @@ -41,17 +48,14 @@ public class SqsSinkConnectorConfig extends SqsConnectorConfig { 0, ConfigDef.Width.LONG, "AWS Credentials Provider Class") - .define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH, - "SQS queue AWS region.") - .define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, Importance.LOW, - "If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.") .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue(), Type.BOOLEAN, false, Importance.LOW, "If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.") .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue(), Type.LIST, "", Importance.LOW, - "The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.") - .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_ACCESS_KEY_ID.getValue(), Type.STRING, "", Importance.LOW, "AWS Secret Access Key to be used with Config credentials provider.") - .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_SECRET_ACCESS_KEY.getValue(), Type.PASSWORD, "", Importance.LOW, "AWS Secret Access Key to be used with Config credentials provider") - .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue(), Type.STRING, "", Importance.LOW, "The name of a single AWS SQS MessageAttribute to use as the partition key"); + "The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.") + .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_ACCESS_KEY_ID.getValue(), Type.STRING, "", Importance.LOW, + "AWS Secret Access Key to be used with Config credentials provider.") + .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_SECRET_ACCESS_KEY.getValue(), Type.PASSWORD, "", Importance.LOW, + "AWS Secret Access Key to be used with Config credentials provider"); public static ConfigDef config() { return CONFIG_DEF; @@ -59,26 +63,21 @@ public static ConfigDef config() { public SqsSinkConnectorConfig(Map originals) { super(config(), originals); - } - private static class CredentialsProviderValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object provider) { - log.warn(".validator:name={}, provider={}", name, provider); - if (provider != null && provider instanceof Class - && AWSCredentialsProvider.class.isAssignableFrom((Class) provider)) { - return; - } - throw new ConfigException( - name, - provider, - "Class must extend: " + AWSCredentialsProvider.class - ); + messageAttributesEnabled = getBoolean(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue()); + if (messageAttributesEnabled) { + messageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue()); + } else { + messageAttributesList = Collections.emptyList(); } + } - @Override - public String toString() { - return "Any class implementing: " + AWSCredentialsProvider.class; - } + public Boolean getMessageAttributesEnabled() { + return messageAttributesEnabled; } + + public List getMessageAttributesList() { + return messageAttributesList; + } + } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java index 4b5a101..b4c313e 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java @@ -94,7 +94,7 @@ public void put( Collection records ) { final Headers headers = record.headers(); messageAttributes = new HashMap<>(); List attributesList = config.getMessageAttributesList(); - boolean allNamesEnabled = (attributesList.size() == 0); + boolean allNamesEnabled = attributesList.isEmpty(); for(Header header: headers) { if(allNamesEnabled || attributesList.contains(header.key())) { if(header.schema().equals(Schema.STRING_SCHEMA)) { @@ -106,7 +106,6 @@ public void put( Collection records ) { } } - if ( Facility.isNotNullNorEmpty( body ) ) { try { final String sid = client.send( config.getQueueUrl(), body, gid, mid, messageAttributes ) ; diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java index a99c4b7..9afaa45 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java @@ -16,6 +16,8 @@ package com.nordstrom.kafka.connect.sqs; +import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; @@ -23,25 +25,40 @@ import org.apache.kafka.common.config.ConfigDef.Type; public class SqsSourceConnectorConfig extends SqsConnectorConfig { - private final Integer maxMessages; private final Integer waitTimeSeconds; + private final Boolean messageAttributesEnabled; + private final List messageAttributesList; + private final String messageAttributePartitionKey; private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), Type.STRING, Importance.HIGH, + "The URL of the SQS queue to be read from.") + .define(SqsConnectorConfigKeys.TOPICS.getValue(), Type.STRING, Importance.HIGH, + "The Kafka topic to be written to.") + .define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH, + "SQS queue AWS region.") + .define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, "", Importance.LOW, + "If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.") + .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, + SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), + new CredentialsProviderValidator(), + Importance.LOW, + "Credentials provider or provider chain to use for authentication to AWS. By default the connector uses 'DefaultAWSCredentialsProviderChain'.", + "SQS", + 0, + ConfigDef.Width.LONG, + "AWS Credentials Provider Class") .define(SqsConnectorConfigKeys.SQS_MAX_MESSAGES.getValue(), Type.INT, 1, Importance.LOW, "Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1.") - .define(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), Type.STRING, Importance.HIGH, "The URL of the SQS queue to be read from.") .define(SqsConnectorConfigKeys.SQS_WAIT_TIME_SECONDS.getValue(), Type.INT, 1, Importance.LOW, "Duration (in seconds) to wait for a message to arrive in the queue. Default is 1.") - .define(SqsConnectorConfigKeys.TOPICS.getValue(), Type.STRING, Importance.HIGH, "The Kafka topic to be written to.") - .define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH, - "SQS queue AWS region.") - .define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, Importance.LOW, - "If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.") .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue(), Type.BOOLEAN, false, Importance.LOW, "If true, it gets the SQS MessageAttributes and inserts them as Kafka Headers (only string headers are currently supported). Default is false.") .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue(), Type.LIST, "", Importance.LOW, - "The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string."); + "The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string.") + .define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue(), Type.STRING, "", Importance.LOW, + "The name of a single AWS SQS MessageAttribute to use as the partition key"); public static ConfigDef config() { return CONFIG_DEF; @@ -51,6 +68,14 @@ public SqsSourceConnectorConfig(Map originals) { super(config(), originals); maxMessages = getInt(SqsConnectorConfigKeys.SQS_MAX_MESSAGES.getValue()); waitTimeSeconds = getInt(SqsConnectorConfigKeys.SQS_WAIT_TIME_SECONDS.getValue()); + + messageAttributesEnabled = getBoolean(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue()); + if (messageAttributesEnabled) { + messageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue()); + } else { + messageAttributesList = Collections.emptyList(); + } + messageAttributePartitionKey = getString(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue()); } public Integer getMaxMessages() { @@ -61,4 +86,15 @@ public Integer getWaitTimeSeconds() { return waitTimeSeconds; } + public Boolean getMessageAttributesEnabled() { + return messageAttributesEnabled; + } + + public List getMessageAttributesList() { + return messageAttributesList; + } + + public String getMessageAttributePartitionKey() { + return messageAttributePartitionKey; + } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java index c2a44a5..ece3f89 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java @@ -103,8 +103,12 @@ public List poll() throws InterruptedException { } // Read messages from the queue. - List messages = client.receive( config.getQueueUrl(), config.getMaxMessages(), - config.getWaitTimeSeconds() ) ; + List messages = client.receive( + config.getQueueUrl(), + config.getMaxMessages(), + config.getWaitTimeSeconds(), + config.getMessageAttributesEnabled(), + config.getMessageAttributesList()); log.debug( ".poll:url={}, max={}, wait={}, size={}", config.getQueueUrl(), config.getMaxMessages(), config.getWaitTimeSeconds(), messages.size() ) ;