Skip to content

Commit

Permalink
Merge pull request #1 from Nordstrom/add_assume_role_support
Browse files Browse the repository at this point in the history
Add assume role support
  • Loading branch information
SgtPepperLHCB authored Mar 1, 2019
2 parents f682ec0 + d190533 commit bbb3a1e
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 221 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ hs_err_pid*

# maven-shade-plugin
dependency-reduced-pom.xml

# IntelliJ
.idea
*.iml
62 changes: 60 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
The SQS connector plugin provides the ability to use AWS SQS queues as both a source (from an SQS queue into a Kafka topic) or sink (out of a Kafka topic into an SQS queue).

## Supported Kafka and AWS versions
The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.452`
The `kafka-connect-sqs` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-sqs:1.11.501`

# Building
You can build the connector with Maven using the standard lifecycle goals:
Expand All @@ -19,7 +19,7 @@ A source connector configuration has two required fields:
* `sqs.queue.url`: The URL of the SQS queue to be read from.
* `topics`: The Kafka topic to be written to.

There are optional fields:
These are optional fields:
* `sqs.max.messages`: Maximum number of messages to read from SQS queue for each poll interval. Range is 0 - 10 with default of 1.
* `sqs.wait.time.seconds`: Duration (in seconds) to wait for a message to arrive in the queue. Default is 1.

Expand Down Expand Up @@ -49,6 +49,13 @@ A sink connector configuration has two required fields:
* `sqs.queue.url`: The URL of the SQS queue to be written to.
* `topics`: The Kafka topic to be read from.

### AWS Assume Role Support options
The connector can assume a cross-account role to enable such features as Server Side Encryption of a queue:
* `sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED Class providing cross-account role assumption.
* `sqs.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access.
* `sqs.credentials.provider.session.name`: REQUIRED Session name
* `sqs.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-sqs` when assuming the role.

### Sample Configuration
```json
{
Expand Down Expand Up @@ -86,6 +93,57 @@ For a `sink` connector, the minimum actions required are:
}
```

### AWS Assume Role Support
* Define the AWS IAM Role that `kafka-connect-sqs` will assume when writing to the queue (e.g., `kafka-connect-sqs-role`) with a Trust Relationship where `xxxxxxxxxxxx` is the AWS Account in which Kafka Connect executes:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::xxxxxxxxxxxx:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "my-queue-external-id"
}
}
}
]
}```

* Define an SQS Queue Policy Document for the queue to allow `SendMessage`. An example policy is:

```json
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-2:nnnnnnnnnnnn:my-queue/SQSDefaultPolicy",
"Statement": [
{
"Sid": "kafka-connect-sqs-sendmessage",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::nnnnnnnnnnnn:role/kafka-connect-sqs-role"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:us-west-2:nnnnnnnnnnnn:my-queue"
}
]
}
```

The sink connector configuration would then include the additional fields:

```json
sqs.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider
sqs.credentials.provider.role.arn=arn:aws:iam::nnnnnnnnnnnn:role/kafka-connect-sqs-role
sqs.credentials.provider.session.name=my-queue-session
sqs.credentials.provider.external.id=my-queue-external-id
```

For a `source` connector, the minimum actions required are:

```json
Expand Down
45 changes: 25 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<groupId>com.nordstrom.kafka.connect.sqs</groupId>
<artifactId>kafka-connect-sqs</artifactId>
<name>Kafka Connect SQS Sink/Source Connector</name>
<version>1.0.0</version>
<version>1.1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -33,7 +33,6 @@
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<junit.version>4.12</junit.version>
<jacoco-maven-plugin.version>0.8.2</jacoco-maven-plugin.version>
<mockito-all.version>2.23.4</mockito-all.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
<maven-javadoc-plugin.version>3.0.1</maven-javadoc-plugin.version>
Expand All @@ -42,7 +41,7 @@
<maven-surefire-plugin.version>2.22.1</maven-surefire-plugin.version>
<maven-project-info-reports-plugin.version>3.0.0</maven-project-info-reports-plugin.version>

<aws.sdk.version>1.11.452</aws.sdk.version>
<aws-java-sdk.version>1.11.501</aws-java-sdk.version>

<kafka.connect-api.version>2.1.0</kafka.connect-api.version>
</properties>
Expand All @@ -60,12 +59,21 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-all.version}</version>
<scope>test</scope>
</dependency> -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -74,25 +82,22 @@
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<!-- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency> -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency> -->
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.nordstrom.kafka.connect.auth;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import org.apache.kafka.common.Configurable;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;

import java.util.Map;

public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable {
//NB: uncomment slf4j imports and field declaration to enable logging.
// private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class);

public static final String EXTERNAL_ID_CONFIG = "external.id";
public static final String ROLE_ARN_CONFIG = "role.arn";
public static final String SESSION_NAME_CONFIG = "session.name";

private String externalId;
private String roleArn;
private String sessionName;

@Override
public void configure(Map<String, ?> map) {
externalId = getOptionalField(map, EXTERNAL_ID_CONFIG);
roleArn = getRequiredField(map, ROLE_ARN_CONFIG);
sessionName = getRequiredField(map, SESSION_NAME_CONFIG);
}

@Override
public AWSCredentials getCredentials() {
AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard();
AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName)
.withStsClient(clientBuilder.defaultClient())
.withExternalId(externalId)
.build();

return provider.getCredentials();
}

@Override
public void refresh() {
//Nothing to do really, since we are assuming a role.
}

private String getOptionalField(final Map<String, ?> map, final String fieldName) {
final Object field = map.get(fieldName);
if (isNotNull(field)) {
return field.toString();
}
return null;
}

private String getRequiredField(final Map<String, ?> map, final String fieldName) {
final Object field = map.get(fieldName);
verifyNotNull(field, fieldName);
final String fieldValue = field.toString();
verifyNotNullOrEmpty(fieldValue, fieldName);

return fieldValue;
}

private boolean isNotNull(final Object field) {
return null != field;
}

private boolean isNotNullOrEmpty(final String field) {
return null != field && !field.isEmpty();
}

private void verifyNotNull(final Object field, final String fieldName) {
if (!isNotNull(field)) {
throw new IllegalArgumentException(String.format("The field '%1s' should not be null", fieldName));
}
}

private void verifyNotNullOrEmpty(final String field, final String fieldName) {
if (!isNotNullOrEmpty(field)) {
throw new IllegalArgumentException(String.format("The field '%1s' should not be null or empty", fieldName));
}
}

}
Loading

0 comments on commit bbb3a1e

Please sign in to comment.