Skip to content

Commit

Permalink
Update README with configuration table and Docker Compose demo steps
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmei committed Aug 2, 2019
1 parent 0a46ee5 commit 606370a
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 89 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ dependency-reduced-pom.xml

# Mac.
.DS_Store

# Only include the example configurations
config/connector.json
config/connector.properties
153 changes: 88 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,108 +1,131 @@
# kafka-connect-lambda
The AWS Lambda connector plugin provides the ability to use AWS Lambda functions as a sink (out of a Kafka topic into a Lambda function).

## Supported Kafka and AWS versions
The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.592`
A Kafka Connect sink plugin to invoke AWS Lambda functions.

# Building
You can build the connector with Maven using the standard lifecycle goals:

Build the connector with Maven using the standard lifecycle goals:

```
mvn clean
mvn package
```

## Sink Connector
_The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.592`_

# Configuring

In addition to the standard [Kafka Connect connector configuration](https://kafka.apache.org/documentation/#connect_configuring) properties, the `kafka-connect-lambda` properties available:

A sink connector reads from a Kafka topic and sends events to an AWS Lambda function.
| Property | Required | Default value | Description |
|:---------|:---------|:--------|:------------|
| `aws.credentials.provider.class` | No | [Default AWS provider chain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) | Class name of an `AWSCredentialsProvider` implementation |
| `aws.lambda.function.arn` | Yes | | Full ARN of the Lambda function |
| `aws.lambda.invocation.timeout.ms` | No | `300000` | Time to wait for a lambda invocation before continuing |
| `aws.lambda.invocation.mode` | No | `SYNC` | `SYNC` for a synchronous invocation; otherwise `ASYNC` |
| `aws.lambda.invocation.failure.mode` | No | `STOP` | Whether to `STOP` processing, or `DROP` and continue after an invocation failure |
| `aws.lambda.batch.enabled` | No | `true` | `true` to batch messages together before an invocation; otherwise `false` |
| `aws.region` | Yes | | AWS region of the Lambda function |
| `http.proxy.host` | No | | HTTP proxy host name |
| `http.proxy.port` | No | | HTTP proxy port number |
| `retriable.error.codes` | No | `500,503,504` | HTTP status codes that will trigger an invocation retry |
| `retry.backoff.millis` | No | `500` | Time to append between invocation retries |
| `retries.max` | No | `5` | Maximum number of invocation retries |
| `topics` | Yes | | Comma-delimited Kafka topics names to sink |

A sink connector configuration has two required fields:
* `aws.lambda.function.arn`: The AWS ARN of the Lambda function to send events to.
* `topics`: The Kafka topic to be read from.

### AWS Assume Role Support options
The connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction and lambda:InvokeAsync actions:
* `aws.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class.
* `aws.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access.
* `aws.credentials.provider.session.name`: REQUIRED Session name
* `aws.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role.
An example configuration represented as JSON data for use with the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html):

### Sample Configuration
```json
{
"name": "aws-lambda-sink-test",
"config": {
"connector.class": "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector",
"tasks.max": "1",
"aws.lambda.function.arn":"arn:aws:lambda:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:function:test-lambda",
"aws.lambda.invocation.timeout.ms":"300000",
"aws.lambda.invocation.mode":"SYNC",
"aws.lambda.batch.enabled":"true",
"aws.lambda.json.wrapper.enabled":"true",
"aws.region":"us-west-2",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "test"
}
"name": "example-lambda-connector",
"config": {
"tasks.max": "1",
"connector.class": "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector",
"topics": "<Your Kafka topics>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"aws.region": "<Your AWS region>",
"aws.lambda.function.arn": "<Your function ARN>",
"aws.lambda.batch.enabled": "false"
}
}
```

## AWS IAM Policies
## IAM assume-role options

By supplying `com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider` as the `aws.credentials.provider.class` configuration, the connector can assume an IAM Role. The role must include a policy that allows `lambda:InvokeFunction` and `lambda:InvokeAsync` actions.

| Property | Required | Description |
|:---------|:---------|:------------|
| `aws.credentials.provider.role.arn` | Yes | Full ARN of the IAM Role to assume |
| `aws.credentials.provider.session.name` | Yes | Name that uniquely identifies a session while the role is being assumed |
| `aws.credentials.provider.external.id` | No | External identifier used by the `kafka-connect-lambda` when assuming the role |

# Try the example demo

The IAM Role that Kafka Connect is running under must have policies set for Lambda resources in order
to invoke target functions.
Follow the demo in order to: create an AWS Lambda function, build the connector plugin, run the connector, and send a message.

For a `sink` connector, the minimum actions can be scripted as follows:
## Create an AWS Lambda function

With an active AWS account, can create a simple AWS Lambda function using the [CloudFormation](https://aws.amazon.com/cloudformation) template in the `config/` directory:

```
aws cloudformation create-stack \
--stack-name example-lambda-stack \
--capabilities CAPABILITY_NAMED_IAM \
--template-body file://config/cloudformation.yml
```
#!/usr/bin/env bash
export AWS_PROFILE=yourprofile

aws lambda add-permission \
--region ${AWS_REGION} \
--function-name yourlambdaFunction \
--principal ${AWS_ACCOUNT_NUMBER} \
--action lambda:InvokeFunction \
To make sure our Lambda works, invoke it directly and view the result payload in `result.txt`:

aws lambda get-policy --function-name yourLambdaFunction
```
aws lambda invoke --function-name example-function --payload '{"value": "my example"}' result.txt
```

# Running the Demo
The function simply sends the `payload` back to you in `result.txt`.

The demo uses the Confluent Platform which can be downloaded here: https://www.confluent.io/download/
Use the `describe-stacks` command to fetch the CloudFormation output value for `ExampleFunctionArn`, which we'll need later when setting up our connector configuration:

You can use either the Enterprise or Community version.
```
aws cloudformation describe-stacks --stack-name example-lambda-stack --query "Stacks[0].Outputs[]"
```

The rest of the tutorial assumes the Confluent Platform is installed at $CP and $CP/bin is on your PATH.
## Build the connector plugin

## AWS
```
mvn clean package
```

The demo assumes you have an AWS account and have valid credentials in ~/.aws/credentials as well as
setting the `AWS_PROFILE` and `AWS_REGION` to appropriate values.
Once built, a `kafka-connect-lambda` uber-jar is in the `target/` directory.

These are required so that Kafka Connect will be able to call your Lambda function.
## Run the connector using Docker Compose

## Create AWS Lambda function
Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Docker Compose will pass these values into the `connect` container.

Create `test-lambda` using the AWS Console. Take note of the ARN value as you will need it to configure the connector later.
Use the provided [Docker Compose](https://docs.docker.com/compose) file and run `docker-compose up`.

## Build the connector plugin
With the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html), verify the Lambda sink connector is installed and ready: `curl http://localhost:8083/connector-plugins`.

Build the connector jar file and copy to the the classpath of Kafka Connect:
Next, supply a connector configuration. You can use `config/connector.json.example` as a starting-point. Fill in values for `<Your AWS Region>` and `<Your function ARN>` and run:

```shell
mvn clean package
mkdir $CP/share/java/kafka-connect-lambda
cp target/kafka-connect-lambda-1.0-SNAPSHOT.jar $CP/share/java/kafka-connect-lambda/
```
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @config/connector.json
```

## Restart Kafka Connect
## Run the connector using the Confluent Platform

Submit a POST for a new connector instance with the JSON configuration above.
Run the ZooKeeper and Kafka components from the [Confluent Platform](https://www.confluent.io/download).

## Send messages
Next, configure a Java properties-file containing your connector configuration. You can use `config/connector.properties.example` as a starting-point. Fill in values for `<Your AWS Region>` and `<Your function ARN>`.

Ensure you have `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables exported in your shell. Then, run the connector in "standalone-mode":

```
connect-standalone config/worker.properties config/connector.properties
```

Using the Kafka console producer, send a message to the topic `test`.
## Send messages

The `sink` connector will read the message from the topic and send the event to the AWS Lambda.
Using the Kafka console producer, send a message to the `example-stream` topic. Your `example-lambda-connector` will read the message from the topic and invoke the AWS Lambda `example-function`.

Use the AWS Console to read your messages sent from the CloudWatch logs for the Lambda.
Use the AWS Console to read the output of your message sent from the CloudWatch logs for the Lambda.
18 changes: 0 additions & 18 deletions config/LambdaSinkConnector.properties

This file was deleted.

35 changes: 35 additions & 0 deletions config/cloudformation.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Lambda connector example function'
Resources:
ExampleFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: example-function
Handler: index.handler
Runtime: python3.7
Role: !GetAtt 'ExampleFunctionRole.Arn'
Code:
ZipFile: |
def handler(event, context):
print(f"hello, {event}")
return event
ExampleFunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: example-lambda-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

Outputs:
ExampleFunctionArn:
Value: !GetAtt 'ExampleFunction.Arn'
ExampleFunctionRoleArn:
Value: !GetAtt 'ExampleFunctionRole.Arn'
15 changes: 15 additions & 0 deletions config/connector.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "example-lambda-connector",
"config": {
"tasks.max": "1",
"connector.class": "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector",
"topics": "example-stream",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"aws.region": "<Your AWS region>",
"aws.lambda.function.arn": "<Your function ARN>",
"aws.lambda.invocation.timeout.ms": "60000",
"aws.lambda.invocation.mode": "SYNC",
"aws.lambda.batch.enabled": "false"
}
}
13 changes: 13 additions & 0 deletions config/connector.properties.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name=example-lambda-connector
connector.class=com.nordstrom.kafka.connect.lambda.LambdaSinkConnector
topics=example-stream
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

aws.region=<Your AWS region>

aws.lambda.function.arn=<Your function ARN>
aws.lambda.invocation.timeout.ms=60000
aws.lambda.invocation.mode=SYNC
aws.lambda.batch.enabled=false
15 changes: 15 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

# default log levels
log4j.logger.org.reflections=ERROR
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR

log4j.logger.com.amazonaws=WARN
#log4j.logger.com.amazonaws.request=DEBUG
#log4j.logger.org.apache.http.wire=DEBUG
14 changes: 14 additions & 0 deletions config/worker.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
bootstrap.servers=localhost:9092

plugin.path=target
offset.storage.file.filename=/tmp/connect.offsets

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
15 changes: 9 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.2
image: confluentinc/cp-zookeeper:5.1.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
logging: { driver: none }

broker:
image: confluentinc/cp-kafka:5.2.2
image: confluentinc/cp-kafka:5.1.3
ports:
- 9092:9092
environment:
Expand All @@ -31,7 +31,7 @@ services:
logging: { driver: none }

schema-registry:
image: confluentinc/cp-schema-registry:5.2.2
image: confluentinc/cp-schema-registry:5.1.3
hostname: schema-registry
ports:
- 8080:8080
Expand All @@ -47,7 +47,7 @@ services:
logging: { driver: none }

connect:
image: confluentinc/cp-kafka-connect:5.2.2
image: confluentinc/cp-kafka-connect:5.1.3
ports:
- 8083:8083
environment:
Expand All @@ -64,13 +64,16 @@ services:
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

- CONNECT_PLUGIN_PATH=/opt/connectors

- KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties

- AWS_PROFILE
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
volumes:
- ~/.aws:/root/.aws
- ./target:/opt/connectors
- /opt/connectors/.shaded-jar
- ./config/log4j.properties:/etc/log4j.properties
depends_on: [broker]

0 comments on commit 606370a

Please sign in to comment.