Skip to content

Commit

Permalink
Merge pull request #44 from koosie0507/feature/partition-key-from-mes…
Browse files Browse the repository at this point in the history
…sage-attribute

Allow configuring partition key from a MessageAttribute
  • Loading branch information
dylanmei authored Feb 29, 2024
2 parents 193b265 + ab5d113 commit 20eaeaf
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Optional properties:
* `sqs.endpoint.url`: Override value for the AWS region specific endpoint.
* `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.
* `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.
* `sqs.message.attributes.partition.key`: The name of a single AWS SQS MessageAttribute to use as the partition key. If this is not specified, default to the SQS message ID as the partition key.

### Sample Configuration
```json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ abstract class SqsConnectorConfig extends AbstractConfig {

private final List<String> messageAttributesList;

private final String messageAttributePartitionKey;

public SqsConnectorConfig(ConfigDef configDef, Map<?, ?> originals) {
super(configDef, originals);
queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue());
Expand All @@ -28,6 +30,7 @@ public SqsConnectorConfig(ConfigDef configDef, Map<?, ?> originals) {

List<String> csMessageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue());
messageAttributesList = messageAttributesEnabled ? csMessageAttributesList : new ArrayList<>();
messageAttributePartitionKey = getString(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY.getValue());
}

public String getQueueUrl() {
Expand All @@ -53,4 +56,8 @@ public Boolean getMessageAttributesEnabled() {
public List<String> getMessageAttributesList() {
return messageAttributesList;
}

public String getMessageAttributePartitionKey() {
return messageAttributePartitionKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum SqsConnectorConfigKeys {
SQS_ENDPOINT_URL("sqs.endpoint.url"),
SQS_MESSAGE_ATTRIBUTES_ENABLED("sqs.message.attributes.enabled"),
SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST("sqs.message.attributes.include.list"),
SQS_MESSAGE_ATTRIBUTE_PARTITION_KEY("sqs.message.attributes.partition.key"),

// These are not part of the connector configuration proper, but just a convenient
// place to define the constants.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.Collectors ;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.nordstrom.kafka.connect.utils.StringUtils;
import org.apache.kafka.connect.data.Schema ;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
Expand Down Expand Up @@ -63,6 +64,31 @@ public void start( Map<String, String> props ) {
log.info( "task.start.OK, sqs.queue.url={}, topics={}", config.getQueueUrl(), config.getTopics() ) ;
}

private String getPartitionKey(Message message) {
String messageId = message.getMessageId();
if (!config.getMessageAttributesEnabled()) {
return messageId;
}
String messageAttributePartitionKey = config.getMessageAttributePartitionKey();
if (StringUtils.isBlank(messageAttributePartitionKey)) {
return messageId;
}

// search for the String message attribute with the same name as the configured partition key
Map<String, MessageAttributeValue> attributes = message.getMessageAttributes();
for(String attributeKey: attributes.keySet()) {
if (!Objects.equals(attributeKey, messageAttributePartitionKey)) {
continue;
}
MessageAttributeValue attrValue = attributes.get(attributeKey);
if (!attrValue.getDataType().equals("String")) {
continue;
}
return attrValue.getStringValue();
}
return messageId;
}

/*
* (non-Javadoc)
*
Expand Down Expand Up @@ -95,11 +121,11 @@ public List<SourceRecord> poll() throws InterruptedException {
log.trace( ".poll:source-partition={}", sourcePartition ) ;
log.trace( ".poll:source-offset={}", sourceOffset ) ;

final String body = message.getBody() ;
final String key = message.getMessageId() ;
final String body = message.getBody();
final String key = getPartitionKey(message);
final String topic = config.getTopics() ;

ConnectHeaders headers = new ConnectHeaders();
final ConnectHeaders headers = new ConnectHeaders();
if (config.getMessageAttributesEnabled()) {
Map<String, MessageAttributeValue> attributes = message.getMessageAttributes();
// sqs api should return only the fields specified in the list
Expand All @@ -112,7 +138,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

return new SourceRecord( sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA,
return new SourceRecord(sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA,
body, null, headers) ;
} ).collect( Collectors.toList() ) ;
}
Expand Down

0 comments on commit 20eaeaf

Please sign in to comment.