Skip to content

Commit

Permalink
Merge pull request #2 from Nordstrom/feature/297-lambda-connector-bat…
Browse files Browse the repository at this point in the history
…ching-improvements

When batching, optimize the number of items to send against the maxim…
  • Loading branch information
seananthonywilliams authored Apr 25, 2019
2 parents 2ef25c5 + 6106254 commit d939427
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,7 +23,14 @@ public class AwsLambdaUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaUtil.class);

private static final int MEGABYTE_SIZE = 1024 * 1024;
private static final int KILOBYTE_SIZE = 1024;

private static final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE);
private static final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE);

private final AWSLambdaAsync lambdaClient;
private final InvocationFailure failureMode;

public AwsLambdaUtil(final Configuration config) {
Guard.verifyNotNull(config, "config");
Expand Down Expand Up @@ -54,6 +62,8 @@ public AwsLambdaUtil(final Configuration config) {
LOGGER.info("Using aws region: {}", config.getAwsRegion().toString());
}

failureMode = config.getFailureMode().orElse(InvocationFailure.STOP);

this.lambdaClient = builder.build();
LOGGER.info("AWS Lambda client initialized");
}
Expand All @@ -78,7 +88,6 @@ InvocationResponse invoke(
final Duration timeout,
final InvocationType event
) {

final InvokeRequest request = new InvokeRequest()
.withInvocationType(event)
.withFunctionName(functionName)
Expand All @@ -91,6 +100,8 @@ InvocationResponse invoke(
final InvokeResult result = futureResult.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
return new InvocationResponse(result.getStatusCode(), result.getLogResult(),
result.getFunctionError(), start, Instant.now());
} catch (RequestTooLargeException e) {
return checkPayloadSizeForInvocationType(payload, event, start, e);
} catch (final InterruptedException | ExecutionException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new LambdaInvocationException(e);
Expand All @@ -100,6 +111,42 @@ InvocationResponse invoke(
}
}

/**
*
* @param payload a byte array representation of the payload sent to AWS Lambda service
* @param event enumeration type to determine if we are sending in aynch, sync, or no-op mode
* @param start time instance when Lambda invocation was started
* @param e exception indicative of the payload size being over the max allowable
* @return a rolled up Lambda invocation response
* @throws RequestTooLargeException is rethrown if the failure mode is set to stop immediately
*/
InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) {
switch (event) {

case Event:
if (payload.length > maxAsyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes);
}
break;

case RequestResponse:
if (payload.length > maxSyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes);
}
break;

default:
LOGGER.info("Dry run call to Lambda with payload size {}", payload.length);
break;
}

if (failureMode.equals(InvocationFailure.STOP)) {
throw e;
}
// Drop message and continue
return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now());
}

private class LambdaInvocationException extends RuntimeException {
public LambdaInvocationException(final Throwable e) {
super(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import java.util.Optional;

import static com.nordstrom.kafka.connect.lambda.InvocationFailure.DROP;

public class Configuration {

private static final int MAX_HTTP_PORT_NUMBER = 65536;
private final Optional<String> credentialsProfile;
private final Optional<String> httpProxyHost;
private final Optional<Integer> httpProxyPort;
private final Optional<String> awsRegion;
private final Optional<InvocationFailure> failureMode;

public Configuration(final String credentialsProfile, final String httpProxyHost,
final Integer httpProxyPort, final String awsRegion) {
final Integer httpProxyPort, final String awsRegion,
final InvocationFailure failureMode) {
/*
* String awsCredentialsProfile =
* System.getenv(CREDENTIALS_PROFILE_CONFIG_ENV); String awsProxyHost =
Expand All @@ -28,21 +32,24 @@ public Configuration(final String credentialsProfile, final String httpProxyHost
? Optional.of(httpProxyPort) : Optional.empty();
this.awsRegion =
Facility.isNotNullNorEmpty(awsRegion) ? Optional.of(awsRegion) : Optional.empty();
this.failureMode = Facility.isNotNull(failureMode) ? Optional.of(failureMode): Optional.of(DROP);
}

public Configuration(final Optional<String> awsCredentialsProfile,
final Optional<String> httpProxyHost,
final Optional<Integer> httpProxyPort,
final Optional<String> awsRegion) {
final Optional<String> awsRegion,
final Optional<InvocationFailure> failureMode) {

this.credentialsProfile = awsCredentialsProfile;
this.httpProxyHost = httpProxyHost;
this.httpProxyPort = httpProxyPort;
this.awsRegion = awsRegion;
this.failureMode = failureMode;
}

public static Configuration empty() {
return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}

public Optional<String> getCredentialsProfile() {
Expand All @@ -59,4 +66,6 @@ public Optional<Integer> getHttpProxyPort() {

public Optional<String> getAwsRegion() { return this.awsRegion; }

public Optional<InvocationFailure> getFailureMode() { return this.failureMode; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.nordstrom.kafka.connect.lambda;

/**
* Facilitates failure behavior to be configurable: stop or drop and continue
*/
public enum InvocationFailure {
STOP, DROP
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
private static final int HTTP_PROXY_PORT_DEFAULT = -1;
private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true;
private static final String AWS_LAMBDA_INVOCATION_MODE_DEFAULT = InvocationMode.SYNC.name();
private static final String AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT = InvocationFailure.STOP.name();
private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504";
private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500;
private static final int RETRIES_DEFAULT = 5;
Expand All @@ -50,6 +51,7 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
private final boolean isWithJsonWrapper = true;
private final int maxBatchSizeBytes = (6 * MEGABYTE_SIZE) - 1;
private final String awsRegion;
private final InvocationFailure failureMode;

public LambdaSinkConnectorConfig(final Map<String, String> properties) {
this(configDefinition, properties);
Expand Down Expand Up @@ -102,6 +104,10 @@ public LambdaSinkConnectorConfig(final Map<String, String> properties) {

this.awsRegion = this.getString(ConfigurationKeys.AWS_REGION.getValue());

this.failureMode = InvocationFailure.valueOf(
this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue())
);

}

public Map<String, String> getProperties() {
Expand Down Expand Up @@ -140,6 +146,10 @@ public InvocationMode getInvocationMode() {
return this.invocationMode;
}

public InvocationFailure getFailureMode() {
return this.failureMode;
}

public boolean isBatchingEnabled() {
return this.isBatchingEnabled;
}
Expand Down Expand Up @@ -181,6 +191,10 @@ public static ConfigDef config() {
AWS_LAMBDA_INVOCATION_MODE_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getDocumentation())

.define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue(), Type.STRING,
AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getDocumentation())

.define(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue(), Type.BOOLEAN,
AWS_LAMBDA_BATCH_ENABLED_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getDocumentation())
Expand Down Expand Up @@ -221,6 +235,10 @@ public enum ConfigurationKeys {
"Determines whether the lambda would be called asynchronously (Event) or Synchronously (Request-Response), possible values are: "
+ Stream.of(InvocationMode.values()).map(InvocationMode::toString)
.collect(Collectors.joining(","))),
AWS_LAMBDA_INVOCATION_FAILURE_MODE("aws.lambda.invocation.failure.mode", // TODO Maybe generalize for other types of failures
"Determines whether the lambda should stop or drop and continue on failure (specifically, payload limit exceeded), possible values are: "
+ Stream.of(InvocationFailure.values()).map(InvocationFailure::toString)
.collect(Collectors.joining(","))),

AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled",
"Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void start(Map<String, String> props) {
this.configuration.getAwsCredentialsProfile(),
this.configuration.getHttpProxyHost(),
this.configuration.getHttpProxyPort(),
this.configuration.getAwsRegion())
this.configuration.getAwsRegion(),
this.configuration.getFailureMode())
);
LOGGER.info("Context for connector {} task {}, Assignments[{}], ",
this.configuration.getConnectorName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.nordstrom.kafka.connect.lambda;

import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import org.junit.Test;

import java.time.Instant;

import static org.junit.Assert.*;

public class AwsLambdaUtilTest {

@Test(expected = RequestTooLargeException.class)
public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() {

final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.STOP);
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration);

testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!"));
}

@Test
public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropContinues() {

AwsLambdaUtil.InvocationResponse testResp = null;
RequestTooLargeException ex = null;

final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.DROP);
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration);

try {
testResp = testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!"));
} catch (RequestTooLargeException e) {
ex = e;
}

assertNull(ex);
assertNotNull(testResp);
assertEquals(413, testResp.getStatusCode().intValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void testPutWhenBatchingIsNotEnabled() {
task.configuration.getAwsCredentialsProfile(),
task.configuration.getHttpProxyHost(),
task.configuration.getHttpProxyPort(),
task.configuration.getAwsRegion())).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now()));
task.configuration.getAwsRegion(),
task.configuration.getFailureMode())).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now()));

Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build();

Expand Down

0 comments on commit d939427

Please sign in to comment.