Skip to content

Commit

Permalink
Merge pull request #34 from valmoz/add-message-attributes
Browse files Browse the repository at this point in the history
Adds string messageattributes to headers conversion
  • Loading branch information
dylanmei authored Feb 16, 2023
2 parents 8dad4ba + 5e82467 commit 5aa5a8d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 20 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Optional properties:
* `sqs.endpoint.url`: Override value for the AWS region specific endpoint.
* `sqs.max.messages`: Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1.
* `sqs.wait.time.seconds`: Duration (in seconds) to wait for a message to arrive in the queue. Default is 1.
* `sqs.message.attributes.enabled`: If true, it gets the SQS MessageAttributes and inserts them as Kafka Headers (only string headers are currently supported). Default is false.
* `sqs.message.attributes.include.list`: The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string.

### Sample Configuration

Expand Down Expand Up @@ -61,6 +63,8 @@ Required properties:
Optional properties:
* `sqs.region`: AWS region of the SQS queue to be written to.
* `sqs.endpoint.url`: Override value for the AWS region specific endpoint.
* `sqs.message.attributes.enabled`: If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.
* `sqs.message.attributes.include.list`: The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.

### Sample Configuration
```json
Expand Down
33 changes: 23 additions & 10 deletions src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.model.*;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
Expand All @@ -28,13 +29,6 @@
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteMessageResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;

import com.nordstrom.kafka.connect.utils.StringUtils;

Expand All @@ -46,6 +40,9 @@ public class SqsClient {
public static final Class<? extends AWSCredentialsProvider> CREDENTIALS_PROVIDER_CLASS_DEFAULT =
com.amazonaws.auth.DefaultAWSCredentialsProviderChain.class;

private final Boolean messageAttributesEnabled;
private final List<String> messageAttributesList;

private final AmazonSQS client;

public SqsClient(SqsConnectorConfig config) {
Expand Down Expand Up @@ -82,6 +79,8 @@ public SqsClient(SqsConnectorConfig config) {
// log.info("AmazonSQS using profile={}, region={}", profile, region);

client = builder.build();
messageAttributesEnabled = config.getMessageAttributesEnabled();
messageAttributesList = config.getMessageAttributesList();
}

/**
Expand Down Expand Up @@ -121,8 +120,17 @@ public List<Message> receive(final String url, final int maxMessages, final int
//
// Receive messages from queue
//
final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url)
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url)
.withMaxNumberOfMessages(maxMessages).withWaitTimeSeconds(waitTimeSeconds).withAttributeNames("");

if (messageAttributesEnabled) {
if (messageAttributesList.size() == 0) {
receiveMessageRequest = receiveMessageRequest.withMessageAttributeNames("All");
} else {
receiveMessageRequest = receiveMessageRequest.withMessageAttributeNames(messageAttributesList);
}
}

final ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
final List<Message> messages = result.getMessages();

Expand All @@ -139,9 +147,10 @@ public List<Message> receive(final String url, final int maxMessages, final int
* @param body The message to send.
* @param groupId Optional group identifier (fifo queues only).
* @param messageId Optional message identifier (fifo queues only).
* @param messageAttributes The message attributes to send.
* @return
*/
public String send(final String url, final String body, final String groupId, final String messageId) {
public String send(final String url, final String body, final String groupId, final String messageId, final Map<String, MessageAttributeValue> messageAttributes) {
log.debug(".send: queue={}, gid={}, mid={}", url, groupId, messageId);

Guard.verifyValidUrl(url);
Expand All @@ -151,7 +160,11 @@ public String send(final String url, final String body, final String groupId, fi
}
final boolean fifo = isFifo(url);

final SendMessageRequest request = new SendMessageRequest(url, body);
SendMessageRequest request = new SendMessageRequest(url, body);
if (messageAttributes != null) {
request.setMessageAttributes(messageAttributes);
}

if (fifo) {
Guard.verifyNotNullOrEmpty(groupId, "groupId");
Guard.verifyNotNullOrEmpty(messageId, "messageId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

abstract class SqsConnectorConfig extends AbstractConfig {
Expand All @@ -12,12 +14,20 @@ abstract class SqsConnectorConfig extends AbstractConfig {
private final String region;
private final String endpointUrl;

private final Boolean messageAttributesEnabled;

private final List<String> messageAttributesList;

public SqsConnectorConfig(ConfigDef configDef, Map<?, ?> originals) {
super(configDef, originals);
queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue());
topics = getString(SqsConnectorConfigKeys.TOPICS.getValue());
region = getString(SqsConnectorConfigKeys.SQS_REGION.getValue());
endpointUrl = getString(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue());
messageAttributesEnabled = getBoolean(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue());

List<String> csMessageAttributesList = getList(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue());
messageAttributesList = messageAttributesEnabled ? csMessageAttributesList : new ArrayList<>();
}

public String getQueueUrl() {
Expand All @@ -31,7 +41,16 @@ public String getTopics() {
public String getRegion() {
return region;
}

public String getEndpointUrl() {
return endpointUrl;
}

public Boolean getMessageAttributesEnabled() {
return messageAttributesEnabled;
}

public List<String> getMessageAttributesList() {
return messageAttributesList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public enum SqsConnectorConfigKeys {
TOPICS("topics"),
SQS_REGION("sqs.region"),
SQS_ENDPOINT_URL("sqs.endpoint.url"),
SQS_MESSAGE_ATTRIBUTES_ENABLED("sqs.message.attributes.enabled"),
SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST("sqs.message.attributes.include.list"),

// These are not part of the connector configuration proper, but just a convenient
// place to define the constants.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public class SqsSinkConnectorConfig extends SqsConnectorConfig {
.define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH,
"SQS queue AWS region.")
.define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, Importance.LOW,
"If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.");
"If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.")
.define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue(), Type.BOOLEAN, false, Importance.LOW,
"If true, it gets the Kafka Headers and inserts them as SQS MessageAttributes (only string headers are currently supported). Default is false.")
.define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue(), Type.LIST, "", Importance.LOW,
"The comma separated list of Header names to be included, if empty it includes all the Headers. Default is the empty string.");

public static ConfigDef config() {
return CONFIG_DEF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@

import java.text.MessageFormat ;
import java.util.Collection ;
import java.util.HashMap;
import java.util.List;
import java.util.Map ;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord ;
import org.apache.kafka.connect.sink.SinkTask ;
import org.slf4j.Logger ;
Expand Down Expand Up @@ -82,9 +88,28 @@ public void put( Collection<SinkRecord> records ) {
final String gid = Facility.isNotNullNorEmpty( key ) ? key : record.topic() ;
final String body = Facility.isNotNull( record.value() ) ? record.value().toString() : "" ;

Map<String, MessageAttributeValue> messageAttributes = null;

if (config.getMessageAttributesEnabled()) {
final Headers headers = record.headers();
messageAttributes = new HashMap<>();
List<String> attributesList = config.getMessageAttributesList();
boolean allNamesEnabled = (attributesList.size() == 0);
for(Header header: headers) {
if(allNamesEnabled || attributesList.contains(header.key())) {
if(header.schema().equals(Schema.STRING_SCHEMA)) {
messageAttributes.put(header.key(), new MessageAttributeValue()
.withDataType("String")
.withStringValue((String)header.value()));
}
}
}
}


if ( Facility.isNotNullNorEmpty( body ) ) {
try {
final String sid = client.send( config.getQueueUrl(), body, gid, mid ) ;
final String sid = client.send( config.getQueueUrl(), body, gid, mid, messageAttributes ) ;

log.debug( ".put.OK:message-id={}, queue.url={}, sqs-group-id={}, sqs-message-id={}", gid, mid,
config.getQueueUrl(), sid ) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public class SqsSourceConnectorConfig extends SqsConnectorConfig {
.define(SqsConnectorConfigKeys.SQS_REGION.getValue(), Type.STRING, System.getenv("AWS_REGION"), Importance.HIGH,
"SQS queue AWS region.")
.define(SqsConnectorConfigKeys.SQS_ENDPOINT_URL.getValue(), Type.STRING, Importance.LOW,
"If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.");

"If specified, the connector will override the AWS region specific endpoint URL with this value. Note that this is not the queue URL.")
.define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_ENABLED.getValue(), Type.BOOLEAN, false, Importance.LOW,
"If true, it gets the SQS MessageAttributes and inserts them as Kafka Headers (only string headers are currently supported). Default is false.")
.define(SqsConnectorConfigKeys.SQS_MESSAGE_ATTRIBUTES_INCLUDE_LIST.getValue(), Type.LIST, "", Importance.LOW,
"The comma separated list of MessageAttribute names to be included, if empty it includes all the Message Attributes. Default is the empty string.");

public static ConfigDef config() {
return CONFIG_DEF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.nordstrom.kafka.connect.sqs ;

import java.util.Collections ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.*;
import java.util.stream.Collectors ;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import org.apache.kafka.connect.data.Schema ;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord ;
import org.apache.kafka.connect.source.SourceTask ;
import org.slf4j.Logger ;
Expand Down Expand Up @@ -98,8 +98,22 @@ public List<SourceRecord> poll() throws InterruptedException {
final String body = message.getBody() ;
final String key = message.getMessageId() ;
final String topic = config.getTopics() ;
return new SourceRecord( sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA,
body ) ;

ConnectHeaders headers = new ConnectHeaders();
if (config.getMessageAttributesEnabled()) {
Map<String, MessageAttributeValue> attributes = message.getMessageAttributes();
// sqs api should return only the fields specified in the list
for(String attributeKey: attributes.keySet()) {
MessageAttributeValue attrValue = attributes.get(attributeKey);
if (attrValue.getDataType().equals("String")) {
SchemaAndValue schemaAndValue = new SchemaAndValue(Schema.STRING_SCHEMA, attrValue.getStringValue());
headers.add(attributeKey, schemaAndValue);
}
}
}

return new SourceRecord( sourcePartition, sourceOffset, topic, null, Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA,
body, null, headers) ;
} ).collect( Collectors.toList() ) ;
}

Expand Down

0 comments on commit 5aa5a8d

Please sign in to comment.