From d2590aacd6cda209726b6da19bf9e764ae74d082 Mon Sep 17 00:00:00 2001 From: Dylan Meissner Date: Mon, 23 Sep 2019 07:20:25 -0700 Subject: [PATCH] Decompose and test connector configuration --- .../AWSAssumeRoleCredentialsProvider.java | 6 +- .../formatters/JsonPayloadFormatter.java | 18 +- .../connect/formatters/SchemaVisibility.java | 7 + .../kafka/connect/lambda/AwsLambdaUtil.java | 181 ------- .../connect/lambda/InvocationClient.java | 203 ++++++++ .../lambda/InvocationClientConfig.java | 311 ++++++++++++ .../connect/lambda/InvocationResponse.java | 46 ++ .../lambda/LambdaSinkConnectorConfig.java | 479 +++--------------- .../kafka/connect/lambda/LambdaSinkTask.java | 99 ++-- .../lambda/PayloadFormatterConfig.java | 133 +++++ .../lambda/SinkRecordSerializable.java | 126 ----- .../kafka/connect/utils/JsonUtil.java | 24 - .../formatters/JsonPayloadFormatterTest.java | 4 +- .../connect/lambda/AwsLambdaUtilTest.java | 53 -- .../lambda/InvocationClientConfigTest.java | 79 +++ .../connect/lambda/InvocationClientTest.java | 100 ++++ .../lambda/LambdaSinkConnectorConfigTest.java | 107 +--- .../connect/lambda/LambdaSinkTaskTest.java | 12 +- .../lambda/PayloadFormatterConfigTest.java | 79 +++ 19 files changed, 1085 insertions(+), 982 deletions(-) create mode 100644 src/main/java/com/nordstrom/kafka/connect/formatters/SchemaVisibility.java delete mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java create mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java create mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java create mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java create mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfig.java delete mode 100644 src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java delete mode 100644 src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java delete mode 100644 src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java create mode 100644 src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java create mode 100644 src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java create mode 100644 src/test/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfigTest.java diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java index 2162453..cd8540b 100644 --- a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -83,15 +83,15 @@ private void verifyNotNullOrEmpty(final String field, final String fieldName) { } } - String getExternalId() { + public String getExternalId() { return this.externalId; } - String getRoleArn() { + public String getRoleArn() { return this.roleArn; } - String getSessionName() { + public String getSessionName() { return this.sessionName; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatter.java b/src/main/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatter.java index 31712e5..9729c49 100644 --- a/src/main/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatter.java +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatter.java @@ -18,12 +18,6 @@ import static java.util.Collections.emptyMap; public class JsonPayloadFormatter implements PayloadFormatter, Configurable { - enum SchemaVisibility { - ALL, - MIN, - NONE - } - private final ObjectWriter recordWriter = new ObjectMapper().writerFor(Payload.class); private final ObjectWriter recordsWriter = new ObjectMapper().writerFor(Payload[].class); private final JsonConverter converter = new JsonConverter(); @@ -44,8 +38,8 @@ public JsonPayloadFormatter() { @Override public void configure(Map configs) { - keySchemaVisibility = configureSchemaVisibility(configs, "formatter.key.schema.visibility"); - valueSchemaVisibility = configureSchemaVisibility(configs, "formatter.value.schema.visibility"); + keySchemaVisibility = configureSchemaVisibility(configs, "key.schema.visibility"); + valueSchemaVisibility = configureSchemaVisibility(configs, "value.schema.visibility"); } private SchemaVisibility configureSchemaVisibility(final Map configs, final String key) { @@ -68,6 +62,14 @@ private SchemaVisibility configureSchemaVisibility(final Map configs, return viz; } + public SchemaVisibility getKeySchemaVisibility() { + return keySchemaVisibility; + } + + public SchemaVisibility getValueSchemaVisibility() { + return valueSchemaVisibility; + } + public String format(final SinkRecord record) { try { return recordWriter.writeValueAsString(recordToPayload(record)); diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/SchemaVisibility.java b/src/main/java/com/nordstrom/kafka/connect/formatters/SchemaVisibility.java new file mode 100644 index 0000000..da007ab --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/SchemaVisibility.java @@ -0,0 +1,7 @@ +package com.nordstrom.kafka.connect.formatters; + +public enum SchemaVisibility { + ALL, + MIN, + NONE +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java deleted file mode 100644 index 36db3b7..0000000 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java +++ /dev/null @@ -1,181 +0,0 @@ -package com.nordstrom.kafka.connect.lambda; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.lambda.AWSLambdaAsync; -import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder; -import com.amazonaws.services.lambda.model.InvocationType; -import com.amazonaws.services.lambda.model.InvokeRequest; -import com.amazonaws.services.lambda.model.InvokeResult; -import com.amazonaws.services.lambda.model.RequestTooLargeException; -import com.nordstrom.kafka.connect.utils.Guard; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.Instant; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class AwsLambdaUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaUtil.class); - - private static final int MEGABYTE_SIZE = 1024 * 1024; - private static final int KILOBYTE_SIZE = 1024; - - private static final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE); - private static final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE); - - private final AWSLambdaAsync lambdaClient; - private final InvocationFailure failureMode; - - public AwsLambdaUtil(ClientConfiguration clientConfiguration, - AWSCredentialsProvider credentialsProvider, - InvocationFailure failureMode) { - - Guard.verifyNotNull(clientConfiguration, "clientConfiguration"); - Guard.verifyNotNull(credentialsProvider, "credentialsProvider"); - - final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard() - .withClientConfiguration(clientConfiguration) - .withCredentials(credentialsProvider); - - this.failureMode = failureMode; - this.lambdaClient = builder.build(); - - LOGGER.info("AWS Lambda client initialized"); - } - - public InvocationResponse invokeSync( - final String functionName, - final byte[] payload, - final Duration timeout) { - return this.invoke(functionName, payload, timeout, InvocationType.RequestResponse); - } - - public InvocationResponse invokeAsync( - final String functionName, - final byte[] payload, - final Duration timeout) { - return this.invoke(functionName, payload, timeout, InvocationType.Event); - } - - InvocationResponse invoke( - final String functionName, - final byte[] payload, - final Duration timeout, - final InvocationType event - ) { - final InvokeRequest request = new InvokeRequest() - .withInvocationType(event) - .withFunctionName(functionName) - .withPayload(ByteBuffer.wrap(payload)); - - final Future futureResult = this.lambdaClient.invokeAsync(request); - - final Instant start = Instant.now(); - try { - final InvokeResult result = futureResult.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - return new InvocationResponse(result.getStatusCode(), result.getLogResult(), - result.getFunctionError(), start, Instant.now()); - } catch (RequestTooLargeException e) { - return checkPayloadSizeForInvocationType(payload, event, start, e); - } catch (final InterruptedException | ExecutionException e) { - LOGGER.error(e.getLocalizedMessage(), e); - throw new LambdaInvocationException(e); - } catch (final TimeoutException e) { - return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), start, - Instant.now()); - } - } - - /** - * - * @param payload a byte array representation of the payload sent to AWS Lambda service - * @param event enumeration type to determine if we are sending in aynch, sync, or no-op mode - * @param start time instance when Lambda invocation was started - * @param e exception indicative of the payload size being over the max allowable - * @return a rolled up Lambda invocation response - * @throws RequestTooLargeException is rethrown if the failure mode is set to stop immediately - */ - InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) { - switch (event) { - - case Event: - if (payload.length > maxAsyncPayloadSizeBytes) { - LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes); - } - break; - - case RequestResponse: - if (payload.length > maxSyncPayloadSizeBytes) { - LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes); - } - break; - - default: - LOGGER.info("Dry run call to Lambda with payload size {}", payload.length); - break; - } - - if (failureMode.equals(InvocationFailure.STOP)) { - throw e; - } - // Drop message and continue - return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); - } - - private class LambdaInvocationException extends RuntimeException { - public LambdaInvocationException(final Throwable e) { - super(e); - } - } - - public class InvocationResponse { - - private final String errorString; - private final String responseString; - private final Integer statusCode; - private final Instant start; - private final Instant end; - - public InvocationResponse( - final Integer statusCode, - final String logResult, - final String functionError, - final Instant start, - final Instant end) { - - this.statusCode = statusCode; - this.responseString = logResult; - this.errorString = functionError; - this.start = start; - this.end = end; - - } - - public Instant getStart() { - return this.start; - } - - public Instant getEnd() { - return this.end; - } - - public String getErrorString() { - return this.errorString; - } - - public String getResponseString() { - return this.responseString; - } - - public Integer getStatusCode() { - return this.statusCode; - } - - } -} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java new file mode 100644 index 0000000..14e0944 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java @@ -0,0 +1,203 @@ +package com.nordstrom.kafka.connect.lambda; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.lambda.AWSLambdaAsync; +import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder; +import com.amazonaws.services.lambda.model.InvocationType; +import com.amazonaws.services.lambda.model.InvokeRequest; +import com.amazonaws.services.lambda.model.InvokeResult; +import com.amazonaws.services.lambda.model.RequestTooLargeException; + +import com.nordstrom.kafka.connect.utils.Facility; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class InvocationClient { + public static final InvocationMode DEFAULT_INVOCATION_MODE = InvocationMode.SYNC; + public static final InvocationFailure DEFAULT_FAILURE_MODE = InvocationFailure.STOP; + public static final long DEFAULT_INVOCATION_TIMEOUT_MS = 5 * 60 * 1000L; + private static final Logger LOGGER = LoggerFactory.getLogger(InvocationClient.class); + + private static final int MEGABYTE_SIZE = 1024 * 1024; + private static final int KILOBYTE_SIZE = 1024; + + private static final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE); + private static final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE); + + private final AWSLambdaAsync innerClient; + private final String functionArn; + private InvocationFailure failureMode; + private InvocationMode invocationMode; + private Duration invocationTimeout; + + private InvocationClient(String functionArn, AWSLambdaAsync innerClient) { + this.functionArn = functionArn; + this.innerClient = innerClient; + } + + public InvocationResponse invoke(final byte[] payload) { + final InvocationType type = invocationMode == InvocationMode.ASYNC + ? InvocationType.Event : InvocationType.RequestResponse; + + final InvokeRequest request = new InvokeRequest() + .withInvocationType(type) + .withFunctionName(functionArn) + .withPayload(ByteBuffer.wrap(payload)); + + final Future futureResult = innerClient.invokeAsync(request); + + final Instant start = Instant.now(); + try { + final InvokeResult result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS); + return new InvocationResponse(result.getStatusCode(), result.getLogResult(), + result.getFunctionError(), start, Instant.now()); + } catch (RequestTooLargeException e) { + return checkPayloadSizeForInvocationType(payload, type, start, e); + } catch (final InterruptedException | ExecutionException e) { + LOGGER.error(e.getLocalizedMessage(), e); + throw new InvocationException(e); + } catch (final TimeoutException e) { + return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), start, + Instant.now()); + } + } + + /** + * + * @param payload a byte array representation of the payload sent to AWS Lambda service + * @param event enumeration type to determine if we are sending in aynch, sync, or no-op mode + * @param start time instance when Lambda invocation was started + * @param e exception indicative of the payload size being over the max allowable + * @return a rolled up Lambda invocation response + * @throws RequestTooLargeException is rethrown if the failure mode is set to stop immediately + */ + InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) { + switch (event) { + + case Event: + if (payload.length > maxAsyncPayloadSizeBytes) { + LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes); + } + break; + + case RequestResponse: + if (payload.length > maxSyncPayloadSizeBytes) { + LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes); + } + break; + + default: + LOGGER.info("Dry run call to Lambda with payload size {}", payload.length); + break; + } + + if (failureMode.equals(InvocationFailure.STOP)) { + throw e; + } + // Drop message and continue + return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); + } + + private class InvocationException extends RuntimeException { + public InvocationException(final Throwable e) { + super(e); + } + } + + public static class Builder { + private String functionArn; + private InvocationMode invocationMode = DEFAULT_INVOCATION_MODE; + private InvocationFailure failureMode = DEFAULT_FAILURE_MODE; + private Duration invocationTimeout = Duration.ofMillis(DEFAULT_INVOCATION_TIMEOUT_MS); + + private final AWSLambdaAsyncClientBuilder innerBuilder; + + public Builder() { + this.innerBuilder = AWSLambdaAsyncClientBuilder.standard(); + } + + public InvocationClient build() { + if (functionArn == null || functionArn.isEmpty()) + throw new IllegalStateException("AWS Lambda function ARN cannot be null or empty"); + + InvocationClient client = new InvocationClient(functionArn, innerBuilder.build()); + client.failureMode = failureMode; + client.invocationMode = invocationMode; + client.invocationTimeout = invocationTimeout; + return client; + } + + public String getFunctionArn() { + return functionArn; + } + + public Builder setFunctionArn(final String functionArn) { + this.functionArn = functionArn; + return this; + } + + public InvocationFailure getFailureMode() { + return failureMode; + } + + public Builder setFailureMode(final InvocationFailure failureMode) { + this.failureMode = failureMode; + return this; + } + + public InvocationMode getInvocationMode() { + return invocationMode; + } + + public Builder setInvocationMode(final InvocationMode invocationMode) { + this.invocationMode = invocationMode; + return this; + } + + public Duration getInvocationTimeout() { + return this.invocationTimeout; + } + + public Builder setInvocationTimeout(final Duration timeout) { + this.invocationTimeout = timeout; + return this; + } + + public String getRegion() { + return this.innerBuilder.getRegion(); + } + + public Builder setRegion(final String awsRegion) { + this.innerBuilder.setRegion(awsRegion); + return this; + } + + public ClientConfiguration getClientConfiguration() { + return this.innerBuilder.getClientConfiguration(); + } + + public Builder withClientConfiguration(final ClientConfiguration clientConfiguration) { + this.innerBuilder.withClientConfiguration(clientConfiguration); + return this; + } + + public AWSCredentialsProvider getCredentialsProvider() { + return this.innerBuilder.getCredentials(); + } + + public Builder withCredentialsProvider(final AWSCredentialsProvider credentialsProvider) { + this.innerBuilder.withCredentials(credentialsProvider); + return this; + } + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java new file mode 100644 index 0000000..e168b5e --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java @@ -0,0 +1,311 @@ +package com.nordstrom.kafka.connect.lambda; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.time.Duration; +import java.lang.reflect.InvocationTargetException; + +public class InvocationClientConfig extends AbstractConfig { + static final String CONFIG_GROUP_NAME = "Lambda"; + + static final String AWS_REGION_KEY = "aws.region"; + static final String AWS_REGION_DOC = "AWS region of the Lambda function"; + static final String FUNCTION_ARN_KEY = "aws.lambda.function.arn"; + static final String FUNCTION_ARN_DOC = "Full ARN of the function to be called"; + static final String INVOCATION_MODE_KEY = "aws.lambda.invocation.mode"; + static final String INVOCATION_MODE_DOC = "Determines whether to invoke the lambda asynchronously (Event) or synchronously (RequestResponse)"; + static final String INVOCATION_TIMEOUT_KEY = "aws.lambda.invocation.timeout.ms"; + static final String INVOCATION_TIMEOUT_DOC = "Time to wait for a response after invoking a lambda. If the response times out, the connector will continue."; + static final String FAILURE_MODE_KEY = "aws.lambda.invocation.failure.mode"; + static final String FAILURE_MODE_DOC = "Determines whether the connector should stop or drop and continue on failure (specifically, payload limit exceeded)"; + + // Client configuration properties + static final String HTTP_PROXY_HOST_KEY = "http.proxy.host"; + static final String HTTP_PROXY_HOST_DOC = "HTTP proxy host to use when invoking the Lambda API"; + static final String HTTP_PROXY_PORT_KEY = "http.proxy.port"; + static final String HTTP_PROXY_PORT_DOC = "HTTP proxy port to use when invoking the Lambda API"; + + // Authentication properties + static final String CREDENTIALS_PROVIDER_CONFIG_PREFIX = "aws.credentials.provider."; + static final String CREDENTIALS_PROVIDER_CLASS_KEY = "aws.credentials.provider.class"; + static final String CREDENTIALS_PROVIDER_CLASS_DOC = "Implementation class which provides AWS authentication credentials"; + static final String IAM_ROLE_ARN_KEY = CREDENTIALS_PROVIDER_CONFIG_PREFIX + AWSAssumeRoleCredentialsProvider.ROLE_ARN_CONFIG; + static final String IAM_ROLE_ARN_DOC = "Full ARN of an IAM role to assume"; + static final String IAM_SESSION_NAME_KEY = CREDENTIALS_PROVIDER_CONFIG_PREFIX + AWSAssumeRoleCredentialsProvider.SESSION_NAME_CONFIG; + static final String IAM_SESSION_NAME_DOC = "IAM session name to use when assuming an IAM role"; + static final String IAM_EXTERNAL_ID_KEY = CREDENTIALS_PROVIDER_CONFIG_PREFIX + AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG; + static final String IAM_EXTERNAL_ID_DOC = "External ID to use when assuming an IAM role"; + + final InvocationClient.Builder clientBuilder; + + InvocationClientConfig(final Map parsedConfig) { + this(new InvocationClient.Builder(), parsedConfig); + } + + InvocationClientConfig(final InvocationClient.Builder builder, final Map parsedConfig) { + super(configDef(), parsedConfig); + + builder + .setFunctionArn(getString(FUNCTION_ARN_KEY)) + .setInvocationMode(InvocationMode.valueOf(getString(INVOCATION_MODE_KEY))) + .setInvocationTimeout(Duration.ofMillis(getLong(INVOCATION_TIMEOUT_KEY))) + .setFailureMode(InvocationFailure.valueOf(getString(FAILURE_MODE_KEY))) + .withClientConfiguration(loadAwsClientConfiguration()) + .withCredentialsProvider(loadAwsCredentialsProvider()); + + String awsRegion = getString(AWS_REGION_KEY); + if (awsRegion != null) + builder.setRegion(awsRegion); + + this.clientBuilder = builder; + } + + public InvocationClient getInvocationClient() { + return this.clientBuilder.build(); + } + + ClientConfiguration loadAwsClientConfiguration() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + + String httpProxyHost = this.getString(HTTP_PROXY_HOST_KEY); + if (httpProxyHost != null && !httpProxyHost.isEmpty()) { + clientConfiguration.setProxyHost(httpProxyHost); + + Integer httpProxyPort = this.getInt(HTTP_PROXY_PORT_KEY); + if (httpProxyPort > 0) + clientConfiguration.setProxyPort(httpProxyPort); + } + + return clientConfiguration; + } + + @SuppressWarnings("unchecked") + AWSCredentialsProvider loadAwsCredentialsProvider() { + try { + AWSCredentialsProvider credentialsProvider = ((Class) + getClass(CREDENTIALS_PROVIDER_CLASS_KEY)).getDeclaredConstructor().newInstance(); + + if (credentialsProvider instanceof Configurable) { + Map configs = originalsWithPrefix( + CREDENTIALS_PROVIDER_CONFIG_PREFIX); + + ((Configurable)credentialsProvider).configure(configs); + } + + return credentialsProvider; + + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { + throw new ConnectException("Unable to create " + CREDENTIALS_PROVIDER_CLASS_KEY, e); + } + } + + public static ConfigDef configDef() { + return configDef(new ConfigDef()); + } + + public static ConfigDef configDef(ConfigDef base) { + int orderInGroup = 0; + + return new ConfigDef(base) + .define(AWS_REGION_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + AWS_REGION_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "AWS region") + + .define(FUNCTION_ARN_KEY, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + FUNCTION_ARN_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.LONG, + "Lambda function ARN") + + .define(INVOCATION_MODE_KEY, + ConfigDef.Type.STRING, + InvocationClient.DEFAULT_INVOCATION_MODE.name(), + new InvocationModeValidator(), + ConfigDef.Importance.MEDIUM, + INVOCATION_MODE_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Invocation mode", + new InvocationModeRecommender()) + + .define(INVOCATION_TIMEOUT_KEY, + ConfigDef.Type.LONG, + (Long)InvocationClient.DEFAULT_INVOCATION_TIMEOUT_MS, + ConfigDef.Importance.LOW, + INVOCATION_TIMEOUT_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Invocation timeout") + + .define(FAILURE_MODE_KEY, + ConfigDef.Type.STRING, + InvocationClient.DEFAULT_FAILURE_MODE.name(), + new InvocationFailureValidator(), + ConfigDef.Importance.LOW, + FAILURE_MODE_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Invocation failure mode", + new InvocationFailureRecommender()) + + .define(HTTP_PROXY_HOST_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + HTTP_PROXY_HOST_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "HTTP proxy host") + + .define(HTTP_PROXY_PORT_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + HTTP_PROXY_PORT_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "HTTP proxy port") + + .define(CREDENTIALS_PROVIDER_CLASS_KEY, + ConfigDef.Type.CLASS, + DefaultAWSCredentialsProviderChain.class, + new AwsCredentialsProviderValidator(), + ConfigDef.Importance.LOW, + CREDENTIALS_PROVIDER_CLASS_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.LONG, + "AWS credentials provider class") + + .define(IAM_ROLE_ARN_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + IAM_ROLE_ARN_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.LONG, + "IAM role ARN") + + .define(IAM_SESSION_NAME_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + IAM_SESSION_NAME_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "IAM session name") + + .define(IAM_EXTERNAL_ID_KEY, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + IAM_EXTERNAL_ID_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "IAM external ID"); + } + + static class InvocationModeRecommender implements ConfigDef.Recommender { + @Override + public List validValues(String name, Map connectorConfigs) { + return Arrays.asList(InvocationMode.values()); + } + + @Override + public boolean visible(String name, Map connectorConfigs) { + return true; + } + } + + static class InvocationModeValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object invocationMode) { + try { + InvocationMode.valueOf(((String)invocationMode).trim()); + } catch (Exception e) { + throw new ConfigException(name, invocationMode, "Value must be one of [" + + Utils.join(InvocationMode.values(), ", ") + "]"); + } + } + + @Override + public String toString() { + return "[" + Utils.join(InvocationMode.values(), ", ") + "]"; + } + } + + static class InvocationFailureRecommender implements ConfigDef.Recommender { + @Override + public List validValues(String name, Map connectorConfigs) { + return Arrays.asList(InvocationFailure.values()); + } + + @Override + public boolean visible(String name, Map connectorConfigs) { + return true; + } + } + + static class InvocationFailureValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object invocationFailure) { + try { + InvocationFailure.valueOf(((String)invocationFailure).trim()); + } catch (Exception e) { + throw new ConfigException(name, invocationFailure, "Value must be one of [" + + Utils.join(InvocationFailure.values(), ", ") + "]"); + } + } + + @Override + public String toString() { + return "[" + Utils.join(InvocationFailure.values(), ", ") + "]"; + } + } + + static class AwsCredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider instanceof Class && AWSCredentialsProvider.class.isAssignableFrom((Class)provider)) { + return; + } + + throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class); + } + + @Override + public String toString() { + return "Any class implementing: " + AWSCredentialsProvider.class; + } + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java new file mode 100644 index 0000000..d4cb7d1 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java @@ -0,0 +1,46 @@ +package com.nordstrom.kafka.connect.lambda; + +import java.time.Instant; + +public class InvocationResponse { + private final String errorString; + private final String responseString; + private final Integer statusCode; + private final Instant start; + private final Instant end; + + public InvocationResponse( + final Integer statusCode, + final String logResult, + final String functionError, + final Instant start, + final Instant end) { + + this.statusCode = statusCode; + this.responseString = logResult; + this.errorString = functionError; + this.start = start; + this.end = end; + + } + + public Instant getStart() { + return this.start; + } + + public Instant getEnd() { + return this.end; + } + + public String getErrorString() { + return this.errorString; + } + + public String getResponseString() { + return this.responseString; + } + + public Integer getStatusCode() { + return this.statusCode; + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java index 4f0d764..e7c1212 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java @@ -5,108 +5,92 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.connect.errors.ConnectException; - -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.ClientConfiguration; import com.nordstrom.kafka.connect.formatters.PayloadFormatter; -import com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter; -import com.nordstrom.kafka.connect.utils.Guard; import java.text.MessageFormat; -import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Arrays; import java.util.stream.Collectors; -import java.util.stream.Stream; + import java.util.concurrent.ThreadLocalRandom; -import java.lang.reflect.InvocationTargetException; public class LambdaSinkConnectorConfig extends AbstractConfig { - private static final String AWS_REGION_DEFAULT = "us-west-2"; - private static final long AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000L; - private static final String HTTP_PROXY_HOST_DEFAULT = ""; - private static final int HTTP_PROXY_PORT_DEFAULT = -1; - private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true; - private static final String AWS_LAMBDA_INVOCATION_MODE_DEFAULT = InvocationMode.SYNC.name(); - private static final String AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT = InvocationFailure.STOP.name(); - private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504"; - private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500; - private static final int RETRIES_DEFAULT = 5; - private static final String AWS_IAM_ROLE_ARN_DEFAULT = ""; - private static final String AWS_IAM_SESSION_NAME_DEFAULT = ""; - private static final String AWS_IAM_EXTERNAL_ID_DEFAULT = ""; - public static final String PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT = "min"; - private static final List PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_LIST = Arrays.asList("none", "min", "all"); + static final String CONNECTOR_NAME_KEY = "name"; + static final String CONNECTOR_NAME_DOC = "DEPRECATED Connector name used in logs"; + + static final String BATCH_RECORDS_ENABLED_KEY = "aws.lambda.batch.enabled"; + static final String BATCH_RECORDS_ENABLED_DOC = "Determines whether to send individual records, or an array of records, when invoking the Lambda"; + static final boolean BATCH_RECORDS_ENABLED_DEFAULT = true; + + static final String RETRIES_MAX_KEY = "retries.max"; + static final String RETRIES_MAX_DOC = "Max number of times to retry the Lambda invocation"; + static final int RETRIES_DEFAULT = 5; + + static final String RETRY_BACKOFF_MILLIS_KEY = "retry.backoff.millis"; + static final String RETRY_BACKOFF_MILLIS_DOC = "The amount of time to wait between Lambda invocation retry attempts"; + static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500; + + static final String RETRIABLE_ERROR_CODES_KEY = "retriable.error.codes"; + static final String RETRIABLE_ERROR_CODES_DOC = "A comma-separated list with the error codes which cause a Lambda invocation retry"; + static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504"; private final String connectorName; - private final ClientConfiguration awsClientConfiguration; - private final AWSCredentialsProvider awsCredentialsProvider; - private final PayloadFormatter payloadFormatter; private final Collection retriableErrorCodes; + private final InvocationClientConfig invocationClientConfig; + private final PayloadFormatterConfig payloadFormatterConfig; LambdaSinkConnectorConfig(final Map parsedConfig) { - this(configDef(), parsedConfig); - } - - LambdaSinkConnectorConfig(final ConfigDef configDef, final Map parsedConfig) { - super(configDef, parsedConfig); - - this.connectorName = parsedConfig.getOrDefault(ConfigurationKeys.NAME_CONFIG.getValue(), + super(configDef(), parsedConfig); + this.connectorName = parsedConfig.getOrDefault(CONNECTOR_NAME_KEY, "LambdaSinkConnector-Unnamed-" + ThreadLocalRandom.current() .ints(4) .mapToObj(String::valueOf) .collect(Collectors.joining())); - - this.awsClientConfiguration = loadAwsClientConfiguration(); - this.awsCredentialsProvider = loadAwsCredentialsProvider(); - this.payloadFormatter = loadPayloadFormatter(); this.retriableErrorCodes = loadRetriableErrorCodes(); + this.invocationClientConfig = new InvocationClientConfig(parsedConfig); + this.payloadFormatterConfig = new PayloadFormatterConfig(parsedConfig); } public String getConnectorName() { return this.connectorName; } - public String getAwsFunctionArn() { - return this.getString(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue()); + public boolean isBatchingEnabled() { + return this.getBoolean(BATCH_RECORDS_ENABLED_KEY); } - public Duration getInvocationTimeout() { - return Duration.ofMillis(this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue())); + public long getRetryBackoffTimeMillis() { + return this.getInt(RETRY_BACKOFF_MILLIS_KEY); } - public InvocationMode getInvocationMode() { - return InvocationMode.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue())); + public int getRetries() { + return this.getInt(RETRIES_MAX_KEY); } - public InvocationFailure getFailureMode() { - return InvocationFailure.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue())); + public Collection getRetriableErrorCodes() { + return this.retriableErrorCodes; } - public boolean isBatchingEnabled() { - return this.getBoolean(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue()); + public InvocationClientConfig getInvocationClientConfig() { + return invocationClientConfig; } - public long getRetryBackoffTimeMillis() { - return this.getInt(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue()); + public InvocationClient getInvocationClient() { + return invocationClientConfig.getInvocationClient(); } - public int getRetries() { - return this.getInt(ConfigurationKeys.RETRIES_MAX.getValue()); + public PayloadFormatterConfig getPayloadFormatterConfig() { + return payloadFormatterConfig; } - public Collection getRetriableErrorCodes() { - return this.retriableErrorCodes; + public PayloadFormatter getPayloadFormatter() { + return payloadFormatterConfig.getPayloadFormatter(); } Collection loadRetriableErrorCodes() { - final List retriableErrorCodesString = this.getList(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue()); + final List retriableErrorCodesString = this.getList(RETRIABLE_ERROR_CODES_KEY); try { return retriableErrorCodesString .stream() @@ -120,373 +104,34 @@ Collection loadRetriableErrorCodes() { } } - public String getAwsRegion() { - return this.getString(ConfigurationKeys.AWS_REGION.getValue()); - } - - public ClientConfiguration getAwsClientConfiguration() { - return this.awsClientConfiguration; - } - - ClientConfiguration loadAwsClientConfiguration() { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - - String httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue()); - - if (!httpProxyHost.isEmpty()) { - clientConfiguration.setProxyHost(httpProxyHost); - - Integer httpProxyPort = this.getInt(ConfigurationKeys.HTTP_PROXY_PORT.getValue()); - if (httpProxyPort != HTTP_PROXY_PORT_DEFAULT) - clientConfiguration.setProxyPort(httpProxyPort); - } - - return clientConfiguration; - } - - public AWSCredentialsProvider getAwsCredentialsProvider() { - return this.awsCredentialsProvider; - } - - @SuppressWarnings("unchecked") - AWSCredentialsProvider loadAwsCredentialsProvider() { - String configKey = ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(); - - try { - AWSCredentialsProvider awsCredentialsProvider = ((Class) - getClass(configKey)).getDeclaredConstructor().newInstance(); - - if (awsCredentialsProvider instanceof Configurable) { - Map configs = originalsWithPrefix( - ConfigurationKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue()); - - ((Configurable)awsCredentialsProvider).configure(configs); - } - - return awsCredentialsProvider; - - } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { - throw new ConnectException("Unable to create " + configKey, e); - } - } - - public String getIamRoleArn() { - return this.getString(ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getValue()); - } - - public String getIamSessionName() { - return this.getString(ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getValue()); - } - - public String getIamExternalId() { - return this.getString(ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getValue()); - } - - public PayloadFormatter getPayloadFormatter() { - return this.payloadFormatter; - } - public String getPayloadFormatterKeySchemaVisibility() { - return this.getString(ConfigurationKeys.PAYLOAD_FORMATTER_KEY_SCHEMA_VISIBILITY_CONFIG.getValue()); - } - public String getPayloadFormatterValueSchemaVisibility() { - return this.getString(ConfigurationKeys.PAYLOAD_FORMATTER_VALUE_SCHEMA_VISIBILITY_CONFIG.getValue()); - } - - @SuppressWarnings("unchecked") - PayloadFormatter loadPayloadFormatter() { - String configKey = ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getValue(); - - try { - PayloadFormatter payloadFormatter = ((Class) - getClass(configKey)).getDeclaredConstructor().newInstance(); - - if (payloadFormatter instanceof Configurable) { - Mapconfigs = originalsWithPrefix( - ConfigurationKeys.PAYLOAD_FORMATTER_CONFIG_PREFIX.getValue()); - ((Configurable)payloadFormatter).configure(configs); - } - return payloadFormatter; - - } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { - throw new ConnectException("Unable to create " + configKey, e); - } - } - public static ConfigDef configDef() { - return new ConfigDef() - .define(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue(), Type.STRING, Importance.HIGH, - ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue(), Type.LONG, - AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT, Importance.HIGH, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue(), Type.STRING, - AWS_LAMBDA_INVOCATION_MODE_DEFAULT, - new InvocationModeValidator(), - Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getDocumentation(), - "LAMBDA", - 0, - ConfigDef.Width.SHORT, - "Invocation mode", - new InvocationModeRecommender()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue(), Type.STRING, - AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT, - new InvocationFailureValidator(), + ConfigDef configDef = new ConfigDef() + .define(BATCH_RECORDS_ENABLED_KEY, + Type.BOOLEAN, + BATCH_RECORDS_ENABLED_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getDocumentation(), - "LAMBDA", - 0, - ConfigDef.Width.SHORT, - "Invocation mode", - new InvocationFailureRecommender()) - - .define(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue(), Type.BOOLEAN, - AWS_LAMBDA_BATCH_ENABLED_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getDocumentation()) - - .define(ConfigurationKeys.AWS_REGION.getValue(), Type.STRING, AWS_REGION_DEFAULT, - Importance.LOW, ConfigurationKeys.AWS_REGION.getDocumentation()) - - .define(ConfigurationKeys.HTTP_PROXY_HOST.getValue(), Type.STRING, HTTP_PROXY_HOST_DEFAULT, - Importance.LOW, ConfigurationKeys.HTTP_PROXY_HOST.getDocumentation()) - - .define(ConfigurationKeys.HTTP_PROXY_PORT.getValue(), Type.INT, HTTP_PROXY_PORT_DEFAULT, - Importance.LOW, ConfigurationKeys.HTTP_PROXY_PORT.getDocumentation()) - - .define(ConfigurationKeys.RETRIES_MAX.getValue(), Type.INT, RETRIES_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRIES_MAX.getDocumentation()) - - .define(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue(), Type.INT, - RETRY_BACKOFF_MILLIS_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRY_BACKOFF_MILLIS.getDocumentation()) - - .define(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue(), Type.LIST, - RETRIABLE_ERROR_CODES_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRIABLE_ERROR_CODES.getDocumentation()) + BATCH_RECORDS_ENABLED_DOC) - .define(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, - ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), - new AwsCredentialsProviderValidator(), + .define(RETRIES_MAX_KEY, + Type.INT, + RETRIES_DEFAULT, Importance.LOW, - ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getDocumentation(), - "LAMBDA", - 0, - ConfigDef.Width.LONG, - "AWS credentials provider class") + RETRIES_MAX_DOC) - .define(ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getValue(), Type.STRING, AWS_IAM_ROLE_ARN_DEFAULT, - Importance.LOW, ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getDocumentation()) - - .define(ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getValue(), Type.STRING, AWS_IAM_SESSION_NAME_DEFAULT, - Importance.LOW, ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getDocumentation()) - - .define(ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getValue(), Type.STRING, AWS_IAM_EXTERNAL_ID_DEFAULT, - Importance.LOW, ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getDocumentation()) - - .define(ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getValue(), Type.CLASS, - PlainPayloadFormatter.class, - new PayloadFormatterClassValidator(), + .define(RETRY_BACKOFF_MILLIS_KEY, + Type.INT, + RETRY_BACKOFF_MILLIS_DEFAULT, Importance.LOW, - ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getDocumentation(), - "LAMBDA", - 0, - ConfigDef.Width.LONG, - "Invocation payload formatter class") + RETRY_BACKOFF_MILLIS_DOC) - .define(ConfigurationKeys.PAYLOAD_FORMATTER_KEY_SCHEMA_VISIBILITY_CONFIG.getValue(), Type.STRING, - PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT, - new PayloadFormatterVisibilityValidator(), + .define(RETRIABLE_ERROR_CODES_KEY, + Type.LIST, + RETRIABLE_ERROR_CODES_DEFAULT, Importance.LOW, - ConfigurationKeys.PAYLOAD_FORMATTER_KEY_SCHEMA_VISIBILITY_CONFIG.getDocumentation() - ) - - .define(ConfigurationKeys.PAYLOAD_FORMATTER_VALUE_SCHEMA_VISIBILITY_CONFIG.getValue(), Type.STRING, - PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT, - new PayloadFormatterVisibilityValidator(), - Importance.LOW, - ConfigurationKeys.PAYLOAD_FORMATTER_VALUE_SCHEMA_VISIBILITY_CONFIG.getDocumentation() - ) - ; - } - - public enum ConfigurationKeys { - NAME_CONFIG("name", "Connector Name"), - AWS_LAMBDA_FUNCTION_ARN("aws.lambda.function.arn", "Full ARN of the function to be called"), - AWS_LAMBDA_INVOCATION_TIMEOUT_MS("aws.lambda.invocation.timeout.ms", - "Time to wait for a lambda invocation, if the response times out, the connector will move forward. Default in ms: " - + AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT), - AWS_LAMBDA_INVOCATION_MODE("aws.lambda.invocation.mode", - "Determines whether the lambda would be called asynchronously (Event) or Synchronously (Request-Response), possible values are: [" - + Stream.of(InvocationMode.values()).map(InvocationMode::toString) - .collect(Collectors.joining(",")) + "]"), - AWS_LAMBDA_INVOCATION_FAILURE_MODE("aws.lambda.invocation.failure.mode", - "Determines whether the lambda should stop or drop and continue on failure (specifically, payload limit exceeded), possible values are: [" - + Stream.of(InvocationFailure.values()).map(InvocationFailure::toString) - .collect(Collectors.joining(",")) + "]"), - - AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled", - "Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " + AWS_LAMBDA_BATCH_ENABLED_DEFAULT), - AWS_REGION("aws.region", - "AWS region to instantiate the Lambda client Default: " + AWS_REGION_DEFAULT), - - HTTP_PROXY_HOST("http.proxy.host", - "Http proxy port to be configured for the Lambda client, by default is empty"), - HTTP_PROXY_PORT("http.proxy.port", - "Http proxy to be configured for the Lambda client, by default is empty"), - RETRIES_MAX("retries.max", "Max number of times to retry a call"), - RETRIABLE_ERROR_CODES("retriable.error.codes" - , "A comma separated list with the error codes to be retried, by default " - + RETRIABLE_ERROR_CODES_DEFAULT), - RETRY_BACKOFF_MILLIS("retry.backoff.millis", - "The amount of time to wait between retry attempts, by default is " - + RETRY_BACKOFF_MILLIS_DEFAULT), - - CREDENTIALS_PROVIDER_CLASS_CONFIG("aws.credentials.provider.class", "Class providing cross-account role assumption"), - CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if aws.lambda.credentials.provider.class is not passed in"), - CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.credentials.provider.", "Note trailing '.'"), - AWS_IAM_ROLE_ARN_CONFIG("aws.credentials.provider.role.arn", "REQUIRED AWS Role ARN providing the access"), - AWS_IAM_SESSION_NAME_CONFIG("aws.credentials.provider.session.name", "REQUIRED Session name"), - AWS_IAM_EXTERNAL_ID_CONFIG("aws.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"), - - PAYLOAD_FORMATTER_CONFIG_PREFIX("payload.", "Note trailing '.'"), - PAYLOAD_FORMATTER_CLASS_CONFIG("payload.formatter.class", "Class formatter for the invocation payload"), - PAYLOAD_FORMATTER_KEY_SCHEMA_VISIBILITY_CONFIG( - "payload.formatter.key.schema.visibility", - "Determines visibility of key schema (none, min, all). Default is " + PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT - ), - PAYLOAD_FORMATTER_VALUE_SCHEMA_VISIBILITY_CONFIG( - "payload.formatter.value.schema.visibility", - "Determines visibility of value schema (none, min, all). Default is " + PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT - ) - ; - - private final String value; - private final String documentation; - - ConfigurationKeys(final String configurationKeyValue, final String documentation) { - Guard.verifyNotNullOrEmpty(configurationKeyValue, "configurationKeyValue"); - - // Empty or null documentation is ok. - this.value = configurationKeyValue; - this.documentation = documentation; - } - - String getValue() { - return this.value; - } - - String getDocumentation() { - return this.documentation; - } - - @Override - public String toString() { - return this.value; - } - } - - private static class InvocationModeRecommender implements ConfigDef.Recommender { - @Override - public List validValues(String name, Map connectorConfigs) { - return Arrays.asList(InvocationMode.values()); - } - - @Override - public boolean visible(String name, Map connectorConfigs) { - return true; - } - } - - private static class InvocationModeValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object invocationMode) { - try { - InvocationMode.valueOf(((String)invocationMode).trim()); - } catch (Exception e) { - throw new ConfigException(name, invocationMode, "Value must be one of [" + - Utils.join(InvocationMode.values(), ", ") + "]"); - } - } - - @Override - public String toString() { - return "[" + Utils.join(InvocationMode.values(), ", ") + "]"; - } - } - - private static class InvocationFailureRecommender implements ConfigDef.Recommender { - @Override - public List validValues(String name, Map connectorConfigs) { - return Arrays.asList(InvocationFailure.values()); - } - - @Override - public boolean visible(String name, Map connectorConfigs) { - return true; - } - } - - private static class InvocationFailureValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object invocationFailure) { - try { - InvocationFailure.valueOf(((String)invocationFailure).trim()); - } catch (Exception e) { - throw new ConfigException(name, invocationFailure, "Value must be one of [" + - Utils.join(InvocationFailure.values(), ", ") + "]"); - } - } - - @Override - public String toString() { - return "[" + Utils.join(InvocationFailure.values(), ", ") + "]"; - } - } - - private static class AwsCredentialsProviderValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object provider) { - if (provider instanceof Class && AWSCredentialsProvider.class.isAssignableFrom((Class)provider)) { - return; - } - - throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class); - } - - @Override - public String toString() { - return "Any class implementing: " + AWSCredentialsProvider.class; - } - } - - private static class PayloadFormatterClassValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object formatter) { - if (formatter instanceof Class && PayloadFormatter.class.isAssignableFrom((Class)formatter)) { - return; - } - - throw new ConfigException(name, formatter, "Class must extend: " + PayloadFormatter.class); - } - - @Override - public String toString() { - return "Any class implementing: " + PayloadFormatter.class; - } - } - - // Validator used for both key, value schema visibility. - private static class PayloadFormatterVisibilityValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object visibility) { - if (PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_LIST.contains(visibility)) { - return; - } + RETRIABLE_ERROR_CODES_DOC); - throw new ConfigException(name, visibility, "Must be one of " + PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_LIST); - } + InvocationClientConfig.configDef(configDef); + PayloadFormatterConfig.configDef(configDef); + return configDef; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 9d6cf55..69cb74d 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -5,7 +5,6 @@ import com.nordstrom.kafka.connect.formatters.PayloadFormattingException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; @@ -24,11 +23,15 @@ public class LambdaSinkTask extends SinkTask { private final Queue batchRecords = new ConcurrentLinkedQueue<>(); private final AtomicInteger retryCount = new AtomicInteger(0); - private final int maxBatchSizeBytes = (6 * 1024 * 1024) - 1; + private final static int maxBatchSizeBytes = (6 * 1024 * 1024) - 1; - AwsLambdaUtil lambdaClient; - PayloadFormatter payloadFormatter; - LambdaSinkConnectorConfig configuration; + private String connectorName; + private InvocationClient invocationClient; + private PayloadFormatter payloadFormatter; + private Boolean isBatchingRecords; + private int maxRetryCount; + private long retryBackoffMs; + private Collection retriableErrorCodes; @Override public String version() { @@ -37,18 +40,20 @@ public String version() { @Override public void start(final Map settings) { - this.configuration = new LambdaSinkConnectorConfig(settings); + LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig(settings); + this.connectorName = config.getConnectorName(); - LOGGER.info("Starting lambda connector {} task.", this.configuration.getConnectorName()); + LOGGER.info("Starting lambda connector {} task.", connectorName); - this.lambdaClient = new AwsLambdaUtil( - this.configuration.getAwsClientConfiguration(), - this.configuration.getAwsCredentialsProvider(), - this.configuration.getFailureMode()); - this.payloadFormatter = this.configuration.getPayloadFormatter(); + this.invocationClient = config.getInvocationClient(); + this.payloadFormatter = config.getPayloadFormatter(); + this.isBatchingRecords = config.isBatchingEnabled(); + this.maxRetryCount = config.getRetries(); + this.retryBackoffMs = config.getRetryBackoffTimeMillis(); + this.retriableErrorCodes = config.getRetriableErrorCodes(); LOGGER.info("Context for connector {} task, Assignments[{}], ", - this.configuration.getConnectorName(), + connectorName, this.context .assignment() .stream() @@ -60,19 +65,18 @@ public void start(final Map settings) { @Override public void put(final Collection records) { if (records == null || records.isEmpty()) { - LOGGER.debug("No records to process. connector=\"{}\"", - this.configuration.getConnectorName()); + LOGGER.debug("No records to process. connector=\"{}\"", this.connectorName); return; } - if (this.configuration.isBatchingEnabled()) { + if (this.isBatchingRecords) { this.batchRecords.addAll(records); final int batchLength = this.getPayload(this.batchRecords).getBytes().length; if (batchLength >= maxBatchSizeBytes) { LOGGER.warn("Batch size reached {} bytes within {} records. connector=\"{}\"", batchLength, this.batchRecords.size(), - this.configuration.getConnectorName()); + this.connectorName); this.rinse(); this.context.requestCommit(); } @@ -90,10 +94,6 @@ public void flush(final Map currentOffsets) { this.rinse(); } - public void setLambdaClient(AwsLambdaUtil lambdaClient) { - this.lambdaClient = lambdaClient; - } - private void rinse() { final List records = new ArrayList<>(this.batchRecords); @@ -102,7 +102,7 @@ private void rinse() { this.splitBatch(records, maxBatchSizeBytes) .forEach(recordsToFlush -> { - final AwsLambdaUtil.InvocationResponse response = this.invoke(this.getPayload(recordsToFlush)); + final InvocationResponse response = this.invoke(this.getPayload(recordsToFlush)); final String responsesMsg = recordsToFlush.stream().map(r -> MessageFormat .format( @@ -114,9 +114,8 @@ private void rinse() { .collect(Collectors.joining(" | ")); if (responsesMsg != null && !responsesMsg.isEmpty()) { final String message = MessageFormat.format( - "Response Summary Batch - arn=\"{0}\", connector=\"{1}\" recordcount=\"{3}\" responseCode=\"{4}\" response=\"{5}\" start=\"{6}\" durationtimemillis=\"{7}\" | {8}", - this.configuration.getAwsFunctionArn(), - this.configuration.getConnectorName(), + "Response Summary Batch - connector=\"{0}\" recordcount=\"{1}\" responseCode=\"{2}\" response=\"{3}\" start=\"{4}\" durationtimemillis=\"{5}\" | {6}", + this.connectorName, recordsToFlush.size(), response.getStatusCode(), String.join(" : ", response.getResponseString(), response.getErrorString()), @@ -126,19 +125,16 @@ private void rinse() { LOGGER.info(message); this.batchRecords.removeAll(recordsToFlush); - } }); + if (!this.batchRecords.isEmpty()) { - LOGGER.error( - "Race Condition Found between sinkConnector.put() and sinkConnector.flush() connector=\"{}\"", - this.configuration.getConnectorName()); + LOGGER.error("Race Condition Found between sinkConnector.put() and sinkConnector.flush() connector=\"{}\"", this.connectorName); } } else { - LOGGER.info("No records sent in the flush cycle. connector=\"{}\"", - this.configuration.getConnectorName()); + LOGGER.info("No records sent in the flush cycle. connector=\"{}\"", connectorName); } this.context.requestCommit(); } @@ -190,35 +186,8 @@ private String getPayload(final Collection records) { } } - private AwsLambdaUtil.InvocationResponse invoke(final String payload) { - - final AwsLambdaUtil.InvocationResponse response; - switch (this.configuration.getInvocationMode()) { - - case ASYNC: - - response = this.lambdaClient.invokeAsync( - this.configuration.getAwsFunctionArn(), - payload.getBytes(), - this.configuration.getInvocationTimeout() - ); - break; - - case SYNC: - response = this.lambdaClient.invokeSync( - this.configuration.getAwsFunctionArn(), - payload.getBytes(), - this.configuration.getInvocationTimeout() - ); - break; - - default: - final String message = MessageFormat.format("The {} {} is not defined.", - LambdaSinkConnectorConfig.ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue(), - this.configuration.getInvocationMode()); - throw new InvalidConfigurationException(message); - } - + private InvocationResponse invoke(final String payload) { + final InvocationResponse response = invocationClient.invoke(payload.getBytes()); final String traceMessage = MessageFormat .format("AWS LAMBDA response {0} {1} {2}.", response.getStatusCode(), @@ -227,13 +196,13 @@ private AwsLambdaUtil.InvocationResponse invoke(final String payload) { ); LOGGER.trace(traceMessage); - this.handleResponse(response, this.retryCount, this.configuration.getRetriableErrorCodes(), - this.configuration.getRetries(), this.configuration.getRetryBackoffTimeMillis()); + this.handleResponse(response, this.retryCount, + this.retriableErrorCodes, this.maxRetryCount, this.retryBackoffMs); return response; } private void handleResponse( - final AwsLambdaUtil.InvocationResponse response, + final InvocationResponse response, final AtomicInteger retryCount, final Collection retriableErrorCodes, final int maxRetries, @@ -265,7 +234,7 @@ private void handleResponse( ); LOGGER.warn(message); retryCount.incrementAndGet(); - this.context.timeout(this.configuration.getRetryBackoffTimeMillis()); + this.context.timeout(backoffTimeMs); throw new RetriableException(message); } @@ -283,7 +252,7 @@ private void handleResponse( @Override public void stop() { - LOGGER.info("Stopping lambda connector {} task.", this.configuration.getConnectorName()); + LOGGER.info("Stopping lambda connector {} task.", this.connectorName); } private class OutOfRetriesException extends RuntimeException { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfig.java new file mode 100644 index 0000000..c1686c7 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfig.java @@ -0,0 +1,133 @@ +package com.nordstrom.kafka.connect.lambda; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; + +import com.nordstrom.kafka.connect.formatters.*; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.lang.reflect.InvocationTargetException; + +public class PayloadFormatterConfig extends AbstractConfig { + static final String CONFIG_GROUP_NAME = "Payload format"; + + static final String FORMATTER_CLASS_KEY = "payload.formatter.class"; + static final String FORMATTER_CLASS_DOC = "Implementation class that formats the invocation payload"; + static final String FORMATTER_PREFIX = "payload.formatter."; + static final String KEY_SCHEMA_VISIBILITY_KEY = FORMATTER_PREFIX + "key.schema.visibility"; + static final String KEY_SCHEMA_VISIBILITY_DOC = "Determines visibility of the key schema (none, min, all)"; + static final String VALUE_SCHEMA_VISIBILITY_KEY = FORMATTER_PREFIX + "value.schema.visibility"; + static final String VALUE_SCHEMA_VISIBILITY_DOC = "Determines visibility of the value schema (none, min, all)"; + + static final String SCHEMA_VISIBILITY_DEFAULT = "min"; + static final List SCHEMA_VISIBILITY_LIST = Arrays.asList("none", "min", "all"); + + PayloadFormatterConfig(final Map parsedConfig) { + super(configDef(), parsedConfig); + } + + @SuppressWarnings("unchecked") + public PayloadFormatter getPayloadFormatter() { + try { + PayloadFormatter payloadFormatter = ((Class) + getClass(FORMATTER_CLASS_KEY)).getDeclaredConstructor().newInstance(); + + if (payloadFormatter instanceof Configurable) { + Mapconfigs = originalsWithPrefix(FORMATTER_PREFIX); + ((Configurable)payloadFormatter).configure(configs); + } + return payloadFormatter; + + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { + throw new ConnectException("Unable to create " + FORMATTER_CLASS_KEY, e); + } + } + + public static ConfigDef configDef() { + return configDef(new ConfigDef()); + } + + public static ConfigDef configDef(ConfigDef base) { + int orderInGroup = 0; + + return new ConfigDef(base) + .define(FORMATTER_CLASS_KEY, + ConfigDef.Type.CLASS, + PlainPayloadFormatter.class, + new FormatterClassValidator(), + ConfigDef.Importance.LOW, + FORMATTER_CLASS_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.LONG, + "Invocation payload formatter class") + + .define(KEY_SCHEMA_VISIBILITY_KEY, + ConfigDef.Type.STRING, + SCHEMA_VISIBILITY_DEFAULT, + new SchemaVisibilityValidator(), + ConfigDef.Importance.LOW, + KEY_SCHEMA_VISIBILITY_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Key schema visibility", + new SchemaVisibilityRecommender()) + + .define(VALUE_SCHEMA_VISIBILITY_KEY, + ConfigDef.Type.STRING, + SCHEMA_VISIBILITY_DEFAULT, + new SchemaVisibilityValidator(), + ConfigDef.Importance.LOW, + VALUE_SCHEMA_VISIBILITY_DOC, + CONFIG_GROUP_NAME, + ++orderInGroup, + ConfigDef.Width.SHORT, + "Value schema visibility", + new SchemaVisibilityRecommender()); + } + + static class FormatterClassValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object formatter) { + if (formatter instanceof Class && PayloadFormatter.class.isAssignableFrom((Class)formatter)) { + return; + } + + throw new ConfigException(name, formatter, "Class must extend: " + PayloadFormatter.class); + } + + @Override + public String toString() { + return "Any class implementing: " + PayloadFormatter.class; + } + } + + static class SchemaVisibilityValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object visibility) { + if (SCHEMA_VISIBILITY_LIST.contains(visibility)) { + return; + } + + throw new ConfigException(name, visibility, "Must be one of " + SCHEMA_VISIBILITY_LIST); + } + } + + static class SchemaVisibilityRecommender implements ConfigDef.Recommender { + @Override + public List validValues(String name, Map connectorConfigs) { + return Arrays.asList(SCHEMA_VISIBILITY_LIST); + } + + @Override + public boolean visible(String name, Map connectorConfigs) { + return true; + } + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java b/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java deleted file mode 100644 index ba58e9a..0000000 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java +++ /dev/null @@ -1,126 +0,0 @@ -package com.nordstrom.kafka.connect.lambda; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.nordstrom.kafka.connect.formatters.PayloadFormattingException; - -import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SinkRecordSerializable { - - private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordSerializable.class); - - private final ObjectWriter jsonWriter = new ObjectMapper() - .writerFor(SinkRecordSerializable.class); - private String value; - private long offset; - private long timestamp; - private String timestampTypeName; - private int partition; - private String key; - private String keySchemaName; - private String valueSchemaName; - private String topic; - - public SinkRecordSerializable(final SinkRecord record) { - super(); - this.key = record.key() == null ? "" : record.key().toString(); - this.keySchemaName = record.keySchema().name(); - - this.value = record.value() == null ? "" : record.value().toString(); - this.valueSchemaName = record.valueSchema().name(); - - this.topic = record.topic(); - this.partition = record.kafkaPartition(); - this.offset = record.kafkaOffset(); - - this.timestamp = record.timestamp(); - this.timestampTypeName = record.timestampType().name; - - } - - public String getValue() { - return this.value; - } - - public void setValue(final String value) { - this.value = value; - } - - public long getOffset() { - return this.offset; - } - - public void setOffset(final long offset) { - this.offset = offset; - } - - public Long getTimestamp() { - return this.timestamp; - } - - public void setTimestamp(final long timestamp) { - this.timestamp = timestamp; - } - - public String getTimestampTypeName() { - return this.timestampTypeName; - } - - public void setTimestampTypeName(final String timestampTypeName) { - this.timestampTypeName = timestampTypeName; - } - - public int getPartition() { - return this.partition; - } - - public void setPartition(final int partition) { - this.partition = partition; - } - - public String getKey() { - return this.key; - } - - public void setKey(final String key) { - this.key = key; - } - - public String getKeySchemaName() { - return this.keySchemaName; - } - - public void setKeySchemaName(final String keySchemaName) { - this.keySchemaName = keySchemaName; - } - - public String getValueSchemaName() { - return this.valueSchemaName; - } - - public void setValueSchemaName(final String valueSchemaName) { - this.valueSchemaName = valueSchemaName; - } - - public String getTopic() { - return this.topic; - } - - public void setTopic(final String topic) { - this.topic = topic; - } - - public String toJsonString() { - try { - return this.jsonWriter.writeValueAsString(this); - } catch (final JsonProcessingException e) { - LOGGER.error(e.getLocalizedMessage(), e); - throw new PayloadFormattingException(e); - } - - } -} diff --git a/src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java b/src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java deleted file mode 100644 index 2eefa03..0000000 --- a/src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.nordstrom.kafka.connect.utils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; - -import java.util.List; - -public class JsonUtil { - - static final ObjectWriter stringListWriter = new ObjectMapper() - .writerFor(new TypeReference>() { - }); - - public static String jsonify(final List stringRecords) { - try { - return stringListWriter.writeValueAsString(stringRecords); - } catch (final JsonProcessingException e) { - e.printStackTrace(); - throw new RuntimeException(); - } - } -} diff --git a/src/test/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatterTest.java b/src/test/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatterTest.java index 2d68965..8652708 100644 --- a/src/test/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatterTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/formatters/JsonPayloadFormatterTest.java @@ -47,8 +47,8 @@ public class JsonPayloadFormatterTest { private static String TEST_VALUE_JSON = "{\"value_name\" : \"test-value-json\"}"; private static String TEST_VALUE_LIST = "[" + TEST_VALUE + "]"; private static String TEST_VALUE_MAP = "{" + TEST_VALUE_KEY + "=" + TEST_VALUE + "}"; - private static String KEY_SCHEMA_VISIBILITY_CONFIG = "formatter.key.schema.visibility"; - private static String VALUE_SCHEMA_VISIBILITY_CONFIG = "formatter.value.schema.visibility"; + private static String KEY_SCHEMA_VISIBILITY_CONFIG = "key.schema.visibility"; + private static String VALUE_SCHEMA_VISIBILITY_CONFIG = "value.schema.visibility"; private static class TestKey { public String key_name; diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java deleted file mode 100644 index 6dfdf26..0000000 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.nordstrom.kafka.connect.lambda; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.lambda.model.InvocationType; -import com.amazonaws.services.lambda.model.RequestTooLargeException; -import org.junit.Test; - -import java.time.Instant; - -import static org.junit.Assert.*; - -public class AwsLambdaUtilTest { - - @Test(expected = RequestTooLargeException.class) - public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() { - AwsLambdaUtil util = new AwsLambdaUtil( - new ClientConfiguration(), - new DefaultAWSCredentialsProviderChain(), - InvocationFailure.STOP); - - util.checkPayloadSizeForInvocationType( - "testpayload".getBytes(), - InvocationType.RequestResponse, - Instant.now(), - new RequestTooLargeException("Request payload is too large!")); - } - - @Test - public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropContinues() { - AwsLambdaUtil.InvocationResponse testResp = null; - RequestTooLargeException ex = null; - - AwsLambdaUtil util = new AwsLambdaUtil( - new ClientConfiguration(), - new DefaultAWSCredentialsProviderChain(), - InvocationFailure.DROP); - - try { - testResp = util.checkPayloadSizeForInvocationType( - "testpayload".getBytes(), - InvocationType.RequestResponse, - Instant.now(), - new RequestTooLargeException("Request payload is too large!")); - } catch (RequestTooLargeException e) { - ex = e; - } - - assertNull(ex); - assertNotNull(testResp); - assertEquals(413, testResp.getStatusCode().intValue()); - } -} diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java new file mode 100644 index 0000000..263ff74 --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java @@ -0,0 +1,79 @@ +package com.nordstrom.kafka.connect.lambda; + +import org.apache.kafka.common.config.ConfigException; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider; + +import java.time.Duration; +import java.util.HashMap; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class InvocationClientConfigTest { + @Test + public void minimalConfig() { + InvocationClient.Builder builder = new InvocationClient.Builder(); + new InvocationClientConfig(builder, + new HashMap() { + { + put("aws.lambda.function.arn", "test-function"); + } + }); + + assertEquals("test-function", builder.getFunctionArn()); + assertNull(builder.getRegion()); + assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); + assertEquals(InvocationFailure.STOP, builder.getFailureMode()); + assertEquals(Duration.ofMinutes(5), builder.getInvocationTimeout()); + assertNotNull(builder.getClientConfiguration()); + assertEquals(DefaultAWSCredentialsProviderChain.class, builder.getCredentialsProvider().getClass()); + } + + @Test + public void sampleConfig() { + InvocationClient.Builder builder = new InvocationClient.Builder(); + new InvocationClientConfig(builder, + new HashMap() { + { + put("aws.region", "us-test-region"); + put("aws.lambda.function.arn", "test-function"); + put("aws.lambda.invocation.timeout.ms", "123"); + put("aws.lambda.invocation.mode", "SYNC"); + put("aws.lambda.invocation.failure.mode", "DROP"); + put("aws.lambda.batch.enabled", "true"); + put("aws.credentials.provider.class", AWSAssumeRoleCredentialsProvider.class.getName()); + put("aws.credentials.provider.role.arn", "test-role"); + put("aws.credentials.provider.session.name", "test-session-name"); + put("aws.credentials.provider.external.id", "test-external-id"); + } + }); + + assertEquals("test-function", builder.getFunctionArn()); + assertEquals("us-test-region", builder.getRegion()); + assertEquals(123, builder.getInvocationTimeout().toMillis()); + assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); + assertEquals(InvocationFailure.DROP, builder.getFailureMode()); + assertNotNull(builder.getClientConfiguration()); + + assertEquals(AWSAssumeRoleCredentialsProvider.class, builder.getCredentialsProvider().getClass()); + AWSAssumeRoleCredentialsProvider credentialsProvider = (AWSAssumeRoleCredentialsProvider)builder.getCredentialsProvider(); + assertEquals("test-role", credentialsProvider.getRoleArn()); + assertEquals("test-session-name", credentialsProvider.getSessionName()); + assertEquals("test-external-id", credentialsProvider.getExternalId()); + } + + @Test(expected = ConfigException.class) + public void testInvocationModeValidatorThrowsException() { + new InvocationClientConfig.InvocationModeValidator() + .ensureValid("invocation.mode", "foo"); + } + + @Test(expected = ConfigException.class) + public void testInvocationFailureValidatorThrowsException() { + new InvocationClientConfig.InvocationFailureValidator() + .ensureValid("invocation.failure.mode", "foo"); + } +} + diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java new file mode 100644 index 0000000..eab020e --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java @@ -0,0 +1,100 @@ +package com.nordstrom.kafka.connect.lambda; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.lambda.model.InvocationType; +import com.amazonaws.services.lambda.model.RequestTooLargeException; + +import java.time.Duration; +import java.time.Instant; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class InvocationClientTest { + @Test + public void testBuilderDefaults() { + InvocationClient.Builder builder = new InvocationClient.Builder(); + assertNull(builder.getFunctionArn()); + assertNull(builder.getRegion()); + assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); + assertEquals(InvocationFailure.STOP, builder.getFailureMode()); + assertEquals(Duration.ofMinutes(5), builder.getInvocationTimeout()); + assertNull(builder.getClientConfiguration()); + assertNull(builder.getCredentialsProvider()); + } + + @Test + public void testBuilderReflexiveProperties() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); + + InvocationClient.Builder builder = new InvocationClient.Builder() + .setFunctionArn("test-function-arn") + .setRegion("us-test-region") + .setInvocationMode(InvocationMode.ASYNC) + .setFailureMode(InvocationFailure.DROP) + .setInvocationTimeout(Duration.ofSeconds(123)) + .withClientConfiguration(clientConfiguration) + .withCredentialsProvider(credentialsProvider); + + assertEquals("test-function-arn", builder.getFunctionArn()); + assertEquals("us-test-region", builder.getRegion()); + assertEquals(InvocationMode.ASYNC, builder.getInvocationMode()); + assertEquals(InvocationFailure.DROP, builder.getFailureMode()); + assertEquals(Duration.ofSeconds(123), builder.getInvocationTimeout()); + assertSame(clientConfiguration, builder.getClientConfiguration()); + assertSame(credentialsProvider, builder.getCredentialsProvider()); + } + + @Test(expected = IllegalStateException.class) + public void ensureFunctionArnIsRequired() { + InvocationClient.Builder builder = new InvocationClient.Builder() + //.setFunctionArn("no-function-arn") + .setRegion("us-test-region"); + + builder.build(); + } + + @Test(expected = RequestTooLargeException.class) + public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() { + InvocationClient client = newClientWithFailureMode(InvocationFailure.STOP); + + client.checkPayloadSizeForInvocationType( + "testpayload".getBytes(), + InvocationType.RequestResponse, + Instant.now(), + new RequestTooLargeException("Request payload is too large!")); + } + + @Test + public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropContinues() { + InvocationResponse testResp = null; + RequestTooLargeException ex = null; + + InvocationClient client = newClientWithFailureMode(InvocationFailure.DROP); + + try { + testResp = client.checkPayloadSizeForInvocationType( + "testpayload".getBytes(), + InvocationType.RequestResponse, + Instant.now(), + new RequestTooLargeException("Request payload is too large!")); + } catch (RequestTooLargeException e) { + ex = e; + } + + assertNull(ex); + assertNotNull(testResp); + assertEquals(413, testResp.getStatusCode().intValue()); + } + + InvocationClient newClientWithFailureMode(InvocationFailure failureMode) { + return new InvocationClient.Builder() + .setFunctionArn("test-function") + .setRegion("test-region-1") + .setFailureMode(failureMode) + .build(); + } +} diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java index 35bc3fc..1d953ef 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java @@ -1,47 +1,31 @@ package com.nordstrom.kafka.connect.lambda; -import com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter; import org.apache.kafka.common.config.ConfigException; -import org.junit.Test; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter; +import java.util.HashMap; +import org.junit.Test; import static org.junit.Assert.*; -import java.util.HashMap; - public class LambdaSinkConnectorConfigTest { @Test public void minimalConfig() { LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( new HashMap() { { - put("aws.lambda.function.arn", "my-function"); + put("aws.lambda.function.arn", "test-function"); } }); assertTrue("Expected auto-generated connector name", config.getConnectorName().contains("LambdaSinkConnector-Unnamed")); - assertEquals("my-function", config.getAwsFunctionArn()); - - assertNotNull(config.getAwsRegion()); - assertNotNull(config.getAwsClientConfiguration()); - assertNotNull(config.getInvocationTimeout()); - assertNotNull(config.getFailureMode()); - assertNotNull(config.getInvocationMode()); - assertNotNull(config.getRetriableErrorCodes()); - assertNotNull(config.getIamRoleArn()); - assertNotNull(config.getIamExternalId()); - assertNotNull(config.getIamSessionName()); - - // Check defaults - assertEquals(DefaultAWSCredentialsProviderChain.class, config.getAwsCredentialsProvider().getClass()); - assertEquals(PlainPayloadFormatter.class, config.getPayloadFormatter().getClass()); + assertNotNull(config.getInvocationClientConfig()); + assertNotNull(config.getPayloadFormatterConfig()); assertTrue(config.isBatchingEnabled()); - assertEquals(config.getPayloadFormatterKeySchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); - assertEquals(config.getPayloadFormatterValueSchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); + assertNotNull(config.getRetriableErrorCodes()); + assertEquals(500L, config.getRetryBackoffTimeMillis()); + assertEquals(5, config.getRetries()); } @Test @@ -50,7 +34,7 @@ public void sampleConfig() { new HashMap() { { put("name", "test-connector"); - put("aws.region", "test-region"); + put("aws.region", "us-test-region"); put("aws.lambda.function.arn", "test-function"); put("aws.lambda.invocation.timeout.ms", "123"); put("aws.lambda.invocation.mode", "SYNC"); @@ -58,81 +42,16 @@ public void sampleConfig() { put("aws.lambda.batch.enabled", "true"); put("retriable.error.codes", "1,2,3"); put("retry.backoff.millis", "123"); - put("retries.max", "123"); + put("retries.max", "456"); } }); assertEquals("test-connector", config.getConnectorName()); - assertEquals("test-region", config.getAwsRegion()); - assertEquals("PT0.123S", config.getInvocationTimeout().toString()); - assertEquals(InvocationMode.SYNC, config.getInvocationMode()); - assertEquals(InvocationFailure.DROP, config.getFailureMode()); + assertNotNull(config.getInvocationClientConfig()); + assertNotNull(config.getPayloadFormatterConfig()); assertTrue(config.isBatchingEnabled()); assertEquals(3, config.getRetriableErrorCodes().size()); assertEquals(123, config.getRetryBackoffTimeMillis()); - assertEquals(123, config.getRetries()); - } - - @Test - public void jsonPayloadFormatterConfig() { - LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( - new HashMap() { - { - put("name", "test-connector"); - put("aws.lambda.function.arn", "test-function"); - put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); - } - }); - - assertEquals("test-connector", config.getConnectorName()); - assertEquals(JsonPayloadFormatter.class, config.getPayloadFormatter().getClass()); - assertEquals(config.getPayloadFormatterKeySchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); - assertEquals(config.getPayloadFormatterValueSchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); + assertEquals(456, config.getRetries()); } - - @Test - public void jsonPayloadFormatterKeySchemaVisibilityConfig() { - LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( - new HashMap() { - { - put("name", "test-connector"); - put("aws.lambda.function.arn", "test-function"); - put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); - put("payload.formatter.key.schema.visibility", "none"); - } - }); - - assertEquals(config.getPayloadFormatterKeySchemaVisibility(), "none"); - assertEquals(config.getPayloadFormatterValueSchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); - } - - @Test - public void jsonPayloadFormatterValueSchemaVisibilityConfig() { - LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( - new HashMap() { - { - put("name", "test-connector"); - put("aws.lambda.function.arn", "test-function"); - put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); - put("payload.formatter.value.schema.visibility", "none"); - } - }); - - assertEquals(config.getPayloadFormatterKeySchemaVisibility(), LambdaSinkConnectorConfig.PAYLOAD_FORMATTER_SCHEMA_VISIBILITY_DEFAULT); - assertEquals(config.getPayloadFormatterValueSchemaVisibility(), "none"); - } - - @Test(expected = ConfigException.class) - public void jsonPayloadFormatterSchemaVisibilityConfigValidatorThrowsException() { - LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( - new HashMap() { - { - put("name", "test-connector"); - put("aws.lambda.function.arn", "test-function"); - put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); - put("payload.formatter.key.schema.visibility", "x-none"); - } - }); - } - } diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index df58120..ceee909 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -45,16 +45,10 @@ public void testPutWhenBatchingIsNotEnabled() { task.start(props); - assertFalse(task.configuration.isBatchingEnabled()); + InvocationClient mockedClient = mock(InvocationClient.class); - AwsLambdaUtil mockedLambdaClient = mock(AwsLambdaUtil.class); - - when(mockedLambdaClient.invoke(anyString(), anyObject(), anyObject(), eq(InvocationType.RequestResponse))) - .thenReturn(new AwsLambdaUtil( - new ClientConfiguration(), - new DefaultAWSCredentialsProviderChain(), - InvocationFailure.STOP) - .new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); + when(mockedClient.invoke(any())) + .thenReturn(new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build(); diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfigTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfigTest.java new file mode 100644 index 0000000..8409c38 --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/PayloadFormatterConfigTest.java @@ -0,0 +1,79 @@ +package com.nordstrom.kafka.connect.lambda; + +import com.nordstrom.kafka.connect.formatters.*; + +import org.apache.kafka.common.config.ConfigException; + +import java.util.HashMap; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class PayloadFormatterConfigTest { + @Test + public void defaultPayloadFormatterConfig() { + PayloadFormatterConfig config = new PayloadFormatterConfig( + new HashMap() { + { + } + }); + + assertEquals(PlainPayloadFormatter.class, config.getPayloadFormatter().getClass()); + } + + @Test + public void jsonPayloadFormatterConfig() { + PayloadFormatterConfig config = new PayloadFormatterConfig( + new HashMap() { + { + put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); + } + }); + + PayloadFormatter formatter = config.getPayloadFormatter(); + assertEquals(JsonPayloadFormatter.class, formatter.getClass()); + assertEquals(SchemaVisibility.MIN, ((JsonPayloadFormatter)formatter).getKeySchemaVisibility()); + assertEquals(SchemaVisibility.MIN, ((JsonPayloadFormatter)formatter).getValueSchemaVisibility()); + } + + @Test + public void jsonPayloadFormatterKeySchemaVisibilityConfig() { + PayloadFormatterConfig config = new PayloadFormatterConfig( + new HashMap() { + { + put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); + put("payload.formatter.key.schema.visibility", "none"); + } + }); + + PayloadFormatter formatter = config.getPayloadFormatter(); + assertEquals(SchemaVisibility.NONE, ((JsonPayloadFormatter)formatter).getKeySchemaVisibility()); + assertEquals(SchemaVisibility.MIN, ((JsonPayloadFormatter)formatter).getValueSchemaVisibility()); + } + + @Test + public void jsonPayloadFormatterValueSchemaVisibilityConfig() { + PayloadFormatterConfig config = new PayloadFormatterConfig( + new HashMap() { + { + put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); + put("payload.formatter.value.schema.visibility", "none"); + } + }); + + PayloadFormatter formatter = config.getPayloadFormatter(); + assertEquals(SchemaVisibility.MIN, ((JsonPayloadFormatter)formatter).getKeySchemaVisibility()); + assertEquals(SchemaVisibility.NONE, ((JsonPayloadFormatter)formatter).getValueSchemaVisibility()); + } + + @Test(expected = ConfigException.class) + public void jsonPayloadFormatterSchemaVisibilityConfigValidatorThrowsException() { + PayloadFormatterConfig config = new PayloadFormatterConfig( + new HashMap() { + { + put("payload.formatter.class", JsonPayloadFormatter.class.getCanonicalName()); + put("payload.formatter.key.schema.visibility", "x-none"); + } + }); + } +}