diff --git a/.gitignore b/.gitignore index 6e15b23..48a7487 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,7 @@ hs_err_pid* # maven-shade-plugin dependency-reduced-pom.xml + +# IntelliJ +.idea +*.iml \ No newline at end of file diff --git a/README.md b/README.md index d0b673d..4894bc1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue). ## Supported Kafka and AWS versions -The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.452` +The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.501` # Building You can build the connector with Maven using the standard lifecycle goals: @@ -19,7 +19,7 @@ A source connector configuration has two required fields: * `sqs.queue.url`: The URL of the SQS queue to be read from. * `topics`: The Kafka topic to be written to. -There are optional fields: +These are optional fields: * `sqs.max.messages`: Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1. * `sqs.wait.time.seconds`: Duration (in seconds) to wait for a message to arrive in the queue. Default is 1. @@ -49,6 +49,13 @@ A sink connector configuration has two required fields: * `sqs.queue.url`: The URL of the SQS queue to be written to. * `topics`: The Kafka topic to be read from. +### AWS Assume Role Support options + The connector can assume a cross-account role to enable such features as Server Side Encryption of a queue: + * `sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED Class providing cross-account role assumption. + * `sqs.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. + * `sqs.credentials.provider.session.name`: REQUIRED Session name + * `sqs.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-sqs` when assuming the role. + ### Sample Configuration ```json { @@ -86,6 +93,57 @@ For a `sink` connector, the minimum actions required are: } ``` +### AWS Assume Role Support +* Define the AWS IAM Role that `kafka-connect-sqs` will assume when writing to the queue (e.g., `kafka-connect-sqs-role`) with a Trust Relationship where `xxxxxxxxxxxx` is the AWS Account in which Kafka Connect executes: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::xxxxxxxxxxxx:root" + }, + "Action": "sts:AssumeRole", + "Condition": { + "StringEquals": { + "sts:ExternalId": "my-queue-external-id" + } + } + } + ] +}``` + +* Define an SQS Queue Policy Document for the queue to allow `SendMessage`. An example policy is: + +```json +{ + "Version": "2012-10-17", + "Id": "arn:aws:sqs:us-west-2:nnnnnnnnnnnn:my-queue/SQSDefaultPolicy", + "Statement": [ + { + "Sid": "kafka-connect-sqs-sendmessage", + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::nnnnnnnnnnnn:role/kafka-connect-sqs-role" + }, + "Action": "sqs:SendMessage", + "Resource": "arn:aws:sqs:us-west-2:nnnnnnnnnnnn:my-queue" + } + ] +} +``` + +The sink connector configuration would then include the additional fields: + +```json + sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider + sqs.credentials.provider.role.arn=arn:aws:iam::nnnnnnnnnnnn:role/kafka-connect-sqs-role + sqs.credentials.provider.session.name=my-queue-session + sqs.credentials.provider.external.id=my-queue-external-id +``` + For a `source` connector, the minimum actions required are: ```json diff --git a/pom.xml b/pom.xml index 08badc8..47934c4 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.nordstrom.kafka.connect.sqs kafka-connect-sqs Kafka Connect SQS Sink/Source Connector - 1.0.0 + 1.1.0 UTF-8 @@ -33,7 +33,6 @@ 1.6.0 4.12 0.8.2 - 2.23.4 3.8.0 2.22.0 3.0.1 @@ -42,7 +41,7 @@ 2.22.1 3.0.0 - 1.11.452 + 1.11.501 2.1.0 @@ -60,12 +59,21 @@ ${junit.version} test - + + com.amazonaws + aws-java-sdk-core + ${aws-java-sdk.version} + + + com.amazonaws + aws-java-sdk-sqs + ${aws-java-sdk.version} + + + com.amazonaws + aws-java-sdk-sts + ${aws-java-sdk.version} + @@ -74,25 +82,22 @@ org.apache.kafka connect-api + + com.amazonaws + aws-java-sdk-core + com.amazonaws aws-java-sdk-sqs - ${aws.sdk.version} - - + + com.amazonaws + aws-java-sdk-sts + junit junit - diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java new file mode 100644 index 0000000..f568b34 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -0,0 +1,85 @@ +package com.nordstrom.kafka.connect.auth; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import org.apache.kafka.common.Configurable; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable { + //NB: uncomment slf4j imports and field declaration to enable logging. +// private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class); + + public static final String EXTERNAL_ID_CONFIG = "external.id"; + public static final String ROLE_ARN_CONFIG = "role.arn"; + public static final String SESSION_NAME_CONFIG = "session.name"; + + private String externalId; + private String roleArn; + private String sessionName; + + @Override + public void configure(Map map) { + externalId = getOptionalField(map, EXTERNAL_ID_CONFIG); + roleArn = getRequiredField(map, ROLE_ARN_CONFIG); + sessionName = getRequiredField(map, SESSION_NAME_CONFIG); + } + + @Override + public AWSCredentials getCredentials() { + AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard(); + AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) + .withStsClient(clientBuilder.defaultClient()) + .withExternalId(externalId) + .build(); + + return provider.getCredentials(); + } + + @Override + public void refresh() { + //Nothing to do really, since we are assuming a role. + } + + private String getOptionalField(final Map map, final String fieldName) { + final Object field = map.get(fieldName); + if (isNotNull(field)) { + return field.toString(); + } + return null; + } + + private String getRequiredField(final Map map, final String fieldName) { + final Object field = map.get(fieldName); + verifyNotNull(field, fieldName); + final String fieldValue = field.toString(); + verifyNotNullOrEmpty(fieldValue, fieldName); + + return fieldValue; + } + + private boolean isNotNull(final Object field) { + return null != field; + } + + private boolean isNotNullOrEmpty(final String field) { + return null != field && !field.isEmpty(); + } + + private void verifyNotNull(final Object field, final String fieldName) { + if (!isNotNull(field)) { + throw new IllegalArgumentException(String.format("The field '%1s' should not be null", fieldName)); + } + } + + private void verifyNotNullOrEmpty(final String field, final String fieldName) { + if (!isNotNullOrEmpty(field)) { + throw new IllegalArgumentException(String.format("The field '%1s' should not be null or empty", fieldName)); + } + } + +} diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java index f82cc80..ffc2189 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsClient.java @@ -1,12 +1,12 @@ /* * Copyright 2019 Nordstrom, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -14,156 +14,200 @@ * the License. */ -package com.nordstrom.kafka.connect.sqs ; +package com.nordstrom.kafka.connect.sqs; -import java.util.List ; +import java.util.List; +import java.util.Map; -import org.slf4j.Logger ; -import org.slf4j.LoggerFactory ; +import com.amazonaws.auth.AWSCredentialsProvider; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.amazonaws.auth.profile.ProfileCredentialsProvider ; -import com.amazonaws.services.sqs.AmazonSQS ; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder ; -import com.amazonaws.services.sqs.model.DeleteMessageRequest ; -import com.amazonaws.services.sqs.model.DeleteMessageResult ; -import com.amazonaws.services.sqs.model.Message ; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest ; -import com.amazonaws.services.sqs.model.ReceiveMessageResult ; -import com.amazonaws.services.sqs.model.SendMessageRequest ; -import com.amazonaws.services.sqs.model.SendMessageResult ; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.DeleteMessageResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; -/** - * @author xjg3 - * - */ -/** - * @author xjg3 - * - */ -/** - * @author xjg3 - * - */ public class SqsClient { - private final Logger log = LoggerFactory.getLogger( this.getClass() ) ; - - private final String AWS_FIFO_SUFFIX = ".fifo" ; - private final String AWS_PROFILE = "AWS_PROFILE" ; - private final String AWS_REGION = "AWS_REGION" ; - - private final AmazonSQS client ; - - public SqsClient() { - final AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard() ; - - // If there's an AWS credentials profile and/or region configured in the - // environment we will use it. - final String profile = System.getenv( AWS_PROFILE ) ; - final String region = System.getenv( AWS_REGION ) ; - if ( Facility.isNotNullNorEmpty( profile ) ) { - builder.setCredentials( new ProfileCredentialsProvider( profile ) ) ; - } - if ( Facility.isNotNullNorEmpty( region ) ) { - builder.setRegion( region ) ; + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private final String AWS_FIFO_SUFFIX = ".fifo"; + private final String AWS_PROFILE = "AWS_PROFILE"; + private final String AWS_REGION = "AWS_REGION"; + public static final Class CREDENTIALS_PROVIDER_CLASS_DEFAULT = + com.amazonaws.auth.DefaultAWSCredentialsProviderChain.class; + + private final AmazonSQS client; + + public SqsClient(Map configs) { + log.warn(".ctor:configs={}", configs); + AWSCredentialsProvider provider = null; + try { + provider = getCredentialsProvider(configs); + } catch ( Exception e ) { + log.error("Problem initializing provider", e); } - log.info( "AmazonSQS using profile={}, region={}", profile, region ) ; - - client = builder.build() ; + final AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard(); + builder.setCredentials(provider); + +// // If there's an AWS credentials profile and/or region configured in the +// // environment we will use it. +// final String profile = System.getenv(AWS_PROFILE); +// final String region = System.getenv(AWS_REGION); +// if (Facility.isNotNullNorEmpty(profile)) { +// builder.setCredentials(provider); +// } +// if (Facility.isNotNullNorEmpty(region)) { +// builder.setRegion(region); +// } +// log.info("AmazonSQS using profile={}, region={}", profile, region); + + client = builder.build(); } /** * Delete a message from the SQS queue. - * + * * @param url SQS queue url. * @param receiptHandle Message receipt handle of message to delete. */ - public void delete( final String url, final String receiptHandle ) { - Guard.verifyValidUrl( url ) ; - Guard.verifyNotNullOrEmpty( receiptHandle, "receiptHandle" ) ; + public void delete(final String url, final String receiptHandle) { + Guard.verifyValidUrl(url); + Guard.verifyNotNullOrEmpty(receiptHandle, "receiptHandle"); - final DeleteMessageRequest request = new DeleteMessageRequest( url, receiptHandle ) ; - final DeleteMessageResult result = client.deleteMessage( request ) ; + final DeleteMessageRequest request = new DeleteMessageRequest(url, receiptHandle); + final DeleteMessageResult result = client.deleteMessage(request); - log.debug( ".delete:receipt-handle={}, rc={}", receiptHandle, result.getSdkHttpMetadata().getHttpStatusCode() ) ; + log.debug(".delete:receipt-handle={}, rc={}", receiptHandle, result.getSdkHttpMetadata().getHttpStatusCode()); } /** * Receive messages from the SQS queue. - * + * * @param url SQS queue url. * @param maxMessages Maximum number of messages to receive for this call. * @param waitTimeSeconds Time to wait, in seconds, for messages to arrive. * @return Collection of messages received. */ - public List receive( final String url, final int maxMessages, final int waitTimeSeconds ) { - log.debug( ".receive:queue={}, max={}, wait={}", url, maxMessages, waitTimeSeconds ) ; - - Guard.verifyValidUrl( url ) ; - Guard.verifyNonNegative( waitTimeSeconds, "sqs.wait.time.seconds" ) ; - Guard.verifyInRange( maxMessages, 0, 10, "sqs.max.messages" ) ; - if ( !isValidState() ) { - throw new IllegalStateException( "AmazonSQS client is not initialized" ) ; + public List receive(final String url, final int maxMessages, final int waitTimeSeconds) { + log.debug(".receive:queue={}, max={}, wait={}", url, maxMessages, waitTimeSeconds); + + Guard.verifyValidUrl(url); + Guard.verifyNonNegative(waitTimeSeconds, "sqs.wait.time.seconds"); + Guard.verifyInRange(maxMessages, 0, 10, "sqs.max.messages"); + if (!isValidState()) { + throw new IllegalStateException("AmazonSQS client is not initialized"); } // // Receive messages from queue // - final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url ) - .withMaxNumberOfMessages( maxMessages ).withWaitTimeSeconds( waitTimeSeconds ).withAttributeNames( "" ) ; - final ReceiveMessageResult result = client.receiveMessage( receiveMessageRequest ) ; - final List messages = result.getMessages() ; + final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url) + .withMaxNumberOfMessages(maxMessages).withWaitTimeSeconds(waitTimeSeconds).withAttributeNames(""); + final ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest); + final List messages = result.getMessages(); - log.debug( ".receive:{} messages, url={}, rc={}", messages.size(), url, - result.getSdkHttpMetadata().getHttpStatusCode() ) ; + log.debug(".receive:{} messages, url={}, rc={}", messages.size(), url, + result.getSdkHttpMetadata().getHttpStatusCode()); - return messages ; + return messages; } /** * Send a message to an SQS queue. - * + * * @param url SQS queue url. * @param body The message to send. * @param groupId Optional group identifier (fifo queues only). * @param messageId Optional message identifier (fifo queues only). * @return */ - public String send( final String url, final String body, final String groupId, final String messageId ) { - log.debug( ".send: queue={}, gid={}, mid={}", url, groupId, messageId ) ; + public String send(final String url, final String body, final String groupId, final String messageId) { + log.debug(".send: queue={}, gid={}, mid={}", url, groupId, messageId); - Guard.verifyValidUrl( url ) ; + Guard.verifyValidUrl(url); // Guard.verifyNotNullOrEmpty( body, "message body" ) ; - if ( !isValidState() ) { - throw new IllegalStateException( "AmazonSQS client is not initialized" ) ; + if (!isValidState()) { + throw new IllegalStateException("AmazonSQS client is not initialized"); } - final boolean fifo = isFifo( url ) ; - - final SendMessageRequest request = new SendMessageRequest( url, body ) ; - if ( fifo ) { - Guard.verifyNotNullOrEmpty( groupId, "groupId" ) ; - Guard.verifyNotNullOrEmpty( messageId, "messageId" ) ; - request.setMessageGroupId( groupId ) ; - request.setMessageDeduplicationId( messageId ) ; + final boolean fifo = isFifo(url); + + final SendMessageRequest request = new SendMessageRequest(url, body); + if (fifo) { + Guard.verifyNotNullOrEmpty(groupId, "groupId"); + Guard.verifyNotNullOrEmpty(messageId, "messageId"); + request.setMessageGroupId(groupId); + request.setMessageDeduplicationId(messageId); } - final SendMessageResult result = client.sendMessage( request ) ; + final SendMessageResult result = client.sendMessage(request); - log.debug( ".send-message.OK: queue={}, result={}", url, result ) ; + log.debug(".send-message.OK: queue={}, result={}", url, result); - return fifo ? result.getSequenceNumber() : result.getMessageId() ; + return fifo ? result.getSequenceNumber() : result.getMessageId(); } - private boolean isFifo( final String url ) { - return url.endsWith( AWS_FIFO_SUFFIX ) ; + private boolean isFifo(final String url) { + return url.endsWith(AWS_FIFO_SUFFIX); } /** * Test that we have properly initialized the AWS SQS client. - * + * * @return true if client is in a valid state. */ private boolean isValidState() { - return Facility.isNotNull( client ) ; + return Facility.isNotNull(client); + } + + @SuppressWarnings("unchecked") + public AWSCredentialsProvider getCredentialsProvider(Map configs) { + log.warn(".get-credentials-provider:configs={}", configs); + + try { + Object providerField = configs.get("class"); + String providerClass = SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(); + if (null != providerField) { + providerClass = providerField.toString(); + } + log.warn(".get-credentials-provider:field={}, class={}", providerField, providerClass); + AWSCredentialsProvider provider = ((Class) + getClass(providerClass)).newInstance(); + + if (provider instanceof Configurable) { +// Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); +// configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( +// CREDENTIALS_PROVIDER_CONFIG_PREFIX.length(), +// CREDENTIALS_PROVIDER_CLASS_CONFIG.length() +// )); + ((Configurable) provider).configure(configs); + } + + log.warn(".get-credentials-provider:provider={}", provider); + return provider; + } catch (IllegalAccessException | InstantiationException e) { + throw new ConnectException( + "Invalid class for: " + SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, + e + ); + } + } + + public Class getClass(String className) { + log.warn(".get-class:class={}",className); + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + log.error("Provider class not found: {}", e); + } + return null; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java new file mode 100644 index 0000000..1f54186 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsConnectorConfigKeys.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 Nordstrom, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nordstrom.kafka.connect.sqs; + +/* + * Contains all connector configuration keys and constants. + */ +public enum SqsConnectorConfigKeys { + SQS_MAX_MESSAGES("sqs.max.messages"), + SQS_QUEUE_URL("sqs.queue.url"), + SQS_WAIT_TIME_SECONDS("sqs.wait.time.seconds"), + TOPICS("topics"), + + // These are not part of the connector configuration proper, but just a convenient + // place to define the constants. + CREDENTIALS_PROVIDER_CLASS_CONFIG("sqs.credentials.provider.class"), + CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain"), + CREDENTIALS_PROVIDER_CONFIG_PREFIX("sqs.credentials.provider."), //NB: trailing '.' + SQS_MESSAGE_ID("sqs.message.id"), + SQS_MESSAGE_RECEIPT_HANDLE("sqs.message.receipt-handle"); + + private final String value; + + SqsConnectorConfigKeys(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + +} diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java index 575b1ca..32a4aad 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorConfig.java @@ -1,12 +1,12 @@ /* * Copyright 2019 Nordstrom, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,53 +14,79 @@ * limitations under the License. */ -package com.nordstrom.kafka.connect.sqs ; +package com.nordstrom.kafka.connect.sqs; -import java.util.Map ; +import java.util.Map; -import org.apache.kafka.common.config.AbstractConfig ; -import org.apache.kafka.common.config.ConfigDef ; -import org.apache.kafka.common.config.ConfigDef.Importance ; -import org.apache.kafka.common.config.ConfigDef.Type ; +import com.amazonaws.auth.AWSCredentialsProvider; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * - */ public class SqsSinkConnectorConfig extends AbstractConfig { - private final String queueUrl ; - private final String topics ; - - public static enum ConfigurationKeys { - SQS_QUEUE_URL( "sqs.queue.url" ), - TOPICS( "topics" ); +// private final Logger log = LoggerFactory.getLogger(this.getClass()); + private static final Logger log = LoggerFactory.getLogger(SqsSinkConnectorConfig.class); - private final String value ; - - private ConfigurationKeys( String value ) { - this.value = value ; - } - } + private final String queueUrl; + private final String topics; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define( ConfigurationKeys.SQS_QUEUE_URL.value, Type.STRING, Importance.HIGH, "URL of the SQS queue to be written to." ) - .define( ConfigurationKeys.TOPICS.value, Type.STRING, Importance.HIGH, "Kafka topic to be read from." ) ; + .define(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), Type.STRING, Importance.HIGH, "URL of the SQS queue to be written to.") + .define(SqsConnectorConfigKeys.TOPICS.getValue(), Type.STRING, Importance.HIGH, "Kafka topic to be read from.") + .define(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, + SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), + new CredentialsProviderValidator(), + Importance.LOW, + "Credentials provider or provider chain to use for authentication to AWS. By default the connector uses 'DefaultAWSCredentialsProviderChain'.", + "SQS", + 0, + ConfigDef.Width.LONG, + "AWS Credentials Provider Class" + ) + ; public static ConfigDef config() { - return CONFIG_DEF ; + return CONFIG_DEF; } - public SqsSinkConnectorConfig( Map originals ) { - super( config(), originals ) ; - queueUrl = getString( ConfigurationKeys.SQS_QUEUE_URL.value ) ; - topics = getString( ConfigurationKeys.TOPICS.value ) ; + public SqsSinkConnectorConfig(Map originals) { + super(config(), originals); + queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue()); + topics = getString(SqsConnectorConfigKeys.TOPICS.getValue()); } public String getQueueUrl() { - return queueUrl ; + return queueUrl; } public String getTopics() { - return topics ; + return topics; } + + + private static class CredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + log.warn(".validator:name={}, provider={}", name, provider); + if (provider != null && provider instanceof Class + && AWSCredentialsProvider.class.isAssignableFrom((Class) provider)) { + return; + } + throw new ConfigException( + name, + provider, + "Class must extend: " + AWSCredentialsProvider.class + ); + } + + @Override + public String toString() { + return "Any class implementing: " + AWSCredentialsProvider.class; + } + } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java index e720d06..3f31f3e 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSinkConnectorTask.java @@ -27,9 +27,6 @@ import com.nordstrom.kafka.connect.sqs.SqsSinkConnector ; -/** - * - */ public class SqsSinkConnectorTask extends SinkTask { private final Logger log = LoggerFactory.getLogger( this.getClass() ) ; @@ -57,7 +54,7 @@ public void start( Map props ) { Guard.verifyNotNull( props, "Task properties" ) ; config = new SqsSinkConnectorConfig( props ) ; - client = new SqsClient() ; + client = new SqsClient(config.originalsWithPrefix(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue())) ; log.info( "task.start:OK, sqs.queue.url={}, topics={}", config.getQueueUrl(), config.getTopics() ) ; } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java index ec30ece..027b3ce 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorConfig.java @@ -1,12 +1,12 @@ /* * Copyright 2019 Nordstrom, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,83 +14,56 @@ * limitations under the License. */ -package com.nordstrom.kafka.connect.sqs ; +package com.nordstrom.kafka.connect.sqs; -import java.util.Map ; +import java.util.Map; -import org.apache.kafka.common.config.AbstractConfig ; -import org.apache.kafka.common.config.ConfigDef ; -import org.apache.kafka.common.config.ConfigDef.Importance ; -import org.apache.kafka.common.config.ConfigDef.Type ; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; -/** - * - */ public class SqsSourceConnectorConfig extends AbstractConfig { - private final Integer maxMessages ; - private final String queueUrl ; - private final String topics ; - private final Integer waitTimeSeconds ; - - public static enum ConfigurationKeys { - SQS_MAX_MESSAGES( "sqs.max.messages" ), - SQS_QUEUE_URL( "sqs.queue.url" ), - SQS_WAIT_TIME_SECONDS( "sqs.wait.time.seconds" ), - TOPICS( "topics" ), - - // These are not part of the connector configuration, but just a convenient - // place to define the constant - SQS_MESSAGE_ID( "sqs.message.id" ), - SQS_MESSAGE_RECEIPT_HANDLE( "sqs.message.receipt-handle" ) - ; - - private final String value ; - - private ConfigurationKeys( String value ) { - this.value = value ; - } - - public String getValue() { - return value ; - } - - } + private final Integer maxMessages; + private final String queueUrl; + private final String topics; + private final Integer waitTimeSeconds; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define( ConfigurationKeys.SQS_MAX_MESSAGES.value, Type.INT, 1, Importance.LOW, - "Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1." ) - .define( ConfigurationKeys.SQS_QUEUE_URL.value, Type.STRING, Importance.HIGH, "The URL of the SQS queue to be read from." ) - .define( ConfigurationKeys.SQS_WAIT_TIME_SECONDS.value, Type.INT, 1, Importance.LOW, - "Duration (in seconds) to wait for a message to arrive in the queue. Default is 1." ) - .define( ConfigurationKeys.TOPICS.value, Type.STRING, Importance.HIGH, "The Kafka topic to be written to." ) ; + .define(SqsConnectorConfigKeys.SQS_MAX_MESSAGES.getValue(), Type.INT, 1, Importance.LOW, + "Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1.") + .define(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), Type.STRING, Importance.HIGH, "The URL of the SQS queue to be read from.") + .define(SqsConnectorConfigKeys.SQS_WAIT_TIME_SECONDS.getValue(), Type.INT, 1, Importance.LOW, + "Duration (in seconds) to wait for a message to arrive in the queue. Default is 1.") + .define(SqsConnectorConfigKeys.TOPICS.getValue(), Type.STRING, Importance.HIGH, "The Kafka topic to be written to."); public static ConfigDef config() { - return CONFIG_DEF ; + return CONFIG_DEF; } - public SqsSourceConnectorConfig( Map originals ) { - super( config(), originals ) ; - maxMessages = getInt( ConfigurationKeys.SQS_MAX_MESSAGES.value ) ; - queueUrl = getString( ConfigurationKeys.SQS_QUEUE_URL.value ) ; - topics = getString( ConfigurationKeys.TOPICS.value ) ; - waitTimeSeconds = getInt( ConfigurationKeys.SQS_WAIT_TIME_SECONDS.value ) ; + public SqsSourceConnectorConfig(Map originals) { + super(config(), originals); + maxMessages = getInt(SqsConnectorConfigKeys.SQS_MAX_MESSAGES.getValue()); + queueUrl = getString(SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue()); + topics = getString(SqsConnectorConfigKeys.TOPICS.getValue()); + waitTimeSeconds = getInt(SqsConnectorConfigKeys.SQS_WAIT_TIME_SECONDS.getValue()); } public Integer getMaxMessages() { - return maxMessages ; + return maxMessages; } public String getQueueUrl() { - return queueUrl ; + return queueUrl; } public String getTopics() { - return topics ; + return topics; } public Integer getWaitTimeSeconds() { - return waitTimeSeconds ; + return waitTimeSeconds; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java index 8c0ef2c..640a096 100644 --- a/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/sqs/SqsSourceConnectorTask.java @@ -30,11 +30,7 @@ import com.amazonaws.services.sqs.model.Message ; import com.nordstrom.kafka.connect.About ; -import com.nordstrom.kafka.connect.sqs.SqsSourceConnectorConfig.ConfigurationKeys ; -/** - * - */ public class SqsSourceConnectorTask extends SourceTask { private final Logger log = LoggerFactory.getLogger( this.getClass() ) ; @@ -62,7 +58,7 @@ public void start( Map props ) { Guard.verifyNotNull( props, "Task properties" ) ; config = new SqsSourceConnectorConfig( props ) ; - client = new SqsClient() ; + client = new SqsClient(config.originalsWithPrefix(SqsConnectorConfigKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue())) ; log.info( "task.start.OK, sqs.queue.url={}, topics={}", config.getQueueUrl(), config.getTopics() ) ; } @@ -89,13 +85,13 @@ public List poll() throws InterruptedException { // Create a SourceRecord for each message in the queue. return messages.stream().map( message -> { - Map sourcePartition = Collections.singletonMap( ConfigurationKeys.SQS_QUEUE_URL.getValue(), + Map sourcePartition = Collections.singletonMap( SqsConnectorConfigKeys.SQS_QUEUE_URL.getValue(), config.getQueueUrl() ) ; Map sourceOffset = new HashMap<>() ; // Save the message id and receipt-handle. receipt-handle is needed to delete // the message once the record is committed. - sourceOffset.put( ConfigurationKeys.SQS_MESSAGE_ID.getValue(), message.getMessageId() ) ; - sourceOffset.put( ConfigurationKeys.SQS_MESSAGE_RECEIPT_HANDLE.getValue(), message.getReceiptHandle() ) ; + sourceOffset.put( SqsConnectorConfigKeys.SQS_MESSAGE_ID.getValue(), message.getMessageId() ) ; + sourceOffset.put( SqsConnectorConfigKeys.SQS_MESSAGE_RECEIPT_HANDLE.getValue(), message.getReceiptHandle() ) ; log.trace( ".poll:source-partition={}", sourcePartition ) ; log.trace( ".poll:source-offset={}", sourceOffset ) ; @@ -113,7 +109,7 @@ public List poll() throws InterruptedException { @Override public void commitRecord( SourceRecord record ) throws InterruptedException { Guard.verifyNotNull( record, "record" ) ; - final String receipt = record.sourceOffset().get( ConfigurationKeys.SQS_MESSAGE_RECEIPT_HANDLE.getValue() ) + final String receipt = record.sourceOffset().get( SqsConnectorConfigKeys.SQS_MESSAGE_RECEIPT_HANDLE.getValue() ) .toString() ; log.debug( ".commit-record:url={}, receipt-handle={}", config.getQueueUrl(), receipt ) ; client.delete( config.getQueueUrl(), receipt ) ;