diff --git a/README.md b/README.md index 0c0b112..9bcf17d 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ Optional properties: * `sqs.endpoint.url`: Override value for the AWS region specific endpoint. * `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false. * `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string. +* `sqs.message.attributes.partition.key`: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key. ### Sample Configuration ```json diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java index 26e700a..e6124e3 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfig.java @@ -18,6 +18,8 @@ abstract class SqsConnectorConfig extends AbstractConfig { private final List messageAttributesList; + private final String messageAttributePartitionKey; + public SqsConnectorConfig(ConfigDef configDef, Map originals) { super(configDef, originals); queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue()); @@ -28,6 +30,7 @@ public SqsConnectorConfig(ConfigDef configDef, Map originals) { List csMessageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue()); messageAttributesList = messageAttributesEnabled ? csMessageAttributesList : new ArrayList<>(); + messageAttributePartitionKey = getString(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue()); } public String getQueueUrl() { @@ -53,4 +56,8 @@ public Boolean getMessageAttributesEnabled() { public List getMessageAttributesList() { return messageAttributesList; } + + public String getMessageAttributePartitionKey() { + return messageAttributePartitionKey; + } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java index 67795e5..132edc2 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java @@ -28,6 +28,7 @@ public enum SqsConnectorConfigKeys { SQS_ENDPOINT_URL("sqs.endpoint.url"), SQS_MESSAGE_ATTRIBUTES_ENABLED("sqs.message.attributes.enabled"), SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST("sqs.message.attributes.include.list"), + SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY("sqs.message.attributes.partition.key"), // These are not part of the connector configuration proper, but just a convenient // place to define the constants. diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java index a89bb6a..c2a44a5 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java @@ -20,6 +20,7 @@ import java.util.stream.Collectors ; import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.nordstrom.kafka.connect.utils.StringUtils; import org.apache.kafka.connect.data.Schema ; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.header.ConnectHeaders; @@ -63,6 +64,31 @@ public void start( Map props ) { log.info( "task.start.OK, sqs.queue.url={}, topics={}", config.getQueueUrl(), config.getTopics() ) ; } + private String getPartitionKey(Message message) { + String messageId = message.getMessageId(); + if (!config.getMessageAttributesEnabled()) { + return messageId; + } + String messageAttributePartitionKey = config.getMessageAttributePartitionKey(); + if (StringUtils.isBlank(messageAttributePartitionKey)) { + return messageId; + } + + // search for the String message attribute with the same name as the configured partition key + Map attributes = message.getMessageAttributes(); + for(String attributeKey: attributes.keySet()) { + if (!Objects.equals(attributeKey, messageAttributePartitionKey)) { + continue; + } + MessageAttributeValue attrValue = attributes.get(attributeKey); + if (!attrValue.getDataType().equals("String")) { + continue; + } + return attrValue.getStringValue(); + } + return messageId; + } + /* * (non-Javadoc) * @@ -95,11 +121,11 @@ public List poll() throws InterruptedException { log.trace( ".poll:source-partition={}", sourcePartition ) ; log.trace( ".poll:source-offset={}", sourceOffset ) ; - final String body = message.getBody() ; - final String key = message.getMessageId() ; + final String body = message.getBody(); + final String key = getPartitionKey(message); final String topic = config.getTopics() ; - ConnectHeaders headers = new ConnectHeaders(); + final ConnectHeaders headers = new ConnectHeaders(); if (config.getMessageAttributesEnabled()) { Map attributes = message.getMessageAttributes(); // sqs api should return only the fields specified in the list @@ -112,7 +138,7 @@ public List poll() throws InterruptedException { } } - return new SourceRecord( sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, + return new SourceRecord(sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, body, null, headers) ; } ).collect( Collectors.toList() ) ; }