Skip to content

Commit

Permalink
When batching, optimize the number of items to send against the maxim…
Browse files Browse the repository at this point in the history
…um allowed by AWS Lambda. Whether batching or not, handle cases where the payload size is beyond the maximum allowed by AWS Lambda: Meaningful log message when max exceeded; Failure behavior should be configurable: stop or drop and continue.
  • Loading branch information
seananthonywilliams committed Apr 16, 2019
1 parent 2ef25c5 commit 585d2c1
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,7 +23,14 @@ public class AwsLambdaUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaUtil.class);

private static final int MEGABYTE_SIZE = 1024 * 1024;
private static final int KILOBYTE_SIZE = 1024;

private final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE);
private final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE);

private final AWSLambdaAsync lambdaClient;
private final InvocationFailure failureMode;

public AwsLambdaUtil(final Configuration config) {
Guard.verifyNotNull(config, "config");
Expand Down Expand Up @@ -54,6 +62,8 @@ public AwsLambdaUtil(final Configuration config) {
LOGGER.info("Using aws region: {}", config.getAwsRegion().toString());
}

failureMode = config.getFailureMode().isPresent() ? config.getFailureMode().get() : InvocationFailure.DROP;

this.lambdaClient = builder.build();
LOGGER.info("AWS Lambda client initialized");
}
Expand All @@ -78,7 +88,6 @@ InvocationResponse invoke(
final Duration timeout,
final InvocationType event
) {

final InvokeRequest request = new InvokeRequest()
.withInvocationType(event)
.withFunctionName(functionName)
Expand All @@ -91,6 +100,8 @@ InvocationResponse invoke(
final InvokeResult result = futureResult.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
return new InvocationResponse(result.getStatusCode(), result.getLogResult(),
result.getFunctionError(), start, Instant.now());
} catch (RequestTooLargeException e) {
return checkPayloadSizeForInvocationType(payload, event, start, e);
} catch (final InterruptedException | ExecutionException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new LambdaInvocationException(e);
Expand All @@ -100,6 +111,33 @@ InvocationResponse invoke(
}
}

InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) {
switch (event) {

case Event:
if (payload.length > maxAsyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes);
}
break;

case RequestResponse:
if (payload.length > maxSyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes);
}
break;

default:
LOGGER.info("Dry run call to Lambda with payload size {}", payload.length);
break;
}

if (failureMode.equals(InvocationFailure.STOP)) {
throw e;
}
// Drop message and continue
return new InvocationResponse(400, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now());
}

private class LambdaInvocationException extends RuntimeException {
public LambdaInvocationException(final Throwable e) {
super(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import java.util.Optional;

import static com.nordstrom.kafka.connect.lambda.InvocationFailure.DROP;

public class Configuration {

private static final int MAX_HTTP_PORT_NUMBER = 65536;
private final Optional<String> credentialsProfile;
private final Optional<String> httpProxyHost;
private final Optional<Integer> httpProxyPort;
private final Optional<String> awsRegion;
private final Optional<InvocationFailure> failureMode;

public Configuration(final String credentialsProfile, final String httpProxyHost,
final Integer httpProxyPort, final String awsRegion) {
final Integer httpProxyPort, final String awsRegion,
final InvocationFailure failureMode) {
/*
* String awsCredentialsProfile =
* System.getenv(CREDENTIALS_PROFILE_CONFIG_ENV); String awsProxyHost =
Expand All @@ -28,21 +32,24 @@ public Configuration(final String credentialsProfile, final String httpProxyHost
? Optional.of(httpProxyPort) : Optional.empty();
this.awsRegion =
Facility.isNotNullNorEmpty(awsRegion) ? Optional.of(awsRegion) : Optional.empty();
this.failureMode = Facility.isNotNull(failureMode) ? Optional.of(failureMode): Optional.of(DROP);
}

public Configuration(final Optional<String> awsCredentialsProfile,
final Optional<String> httpProxyHost,
final Optional<Integer> httpProxyPort,
final Optional<String> awsRegion) {
final Optional<String> awsRegion,
final Optional<InvocationFailure> failureMode) {

this.credentialsProfile = awsCredentialsProfile;
this.httpProxyHost = httpProxyHost;
this.httpProxyPort = httpProxyPort;
this.awsRegion = awsRegion;
this.failureMode = failureMode;
}

public static Configuration empty() {
return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}

public Optional<String> getCredentialsProfile() {
Expand All @@ -59,4 +66,6 @@ public Optional<Integer> getHttpProxyPort() {

public Optional<String> getAwsRegion() { return this.awsRegion; }

public Optional<InvocationFailure> getFailureMode() { return this.failureMode; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.nordstrom.kafka.connect.lambda;

public enum InvocationFailure {
STOP, DROP
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
private static final int HTTP_PROXY_PORT_DEFAULT = -1;
private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true;
private static final String AWS_LAMBDA_INVOCATION_MODE_DEFAULT = InvocationMode.SYNC.name();
private static final String AWS_LAMBDA_INVOCATION_FAILURE_DEFAULT = InvocationFailure.DROP.name();
private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504";
private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500;
private static final int RETRIES_DEFAULT = 5;
Expand All @@ -50,6 +51,7 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
private final boolean isWithJsonWrapper = true;
private final int maxBatchSizeBytes = (6 * MEGABYTE_SIZE) - 1;
private final String awsRegion;
private final InvocationFailure failureMode;

public LambdaSinkConnectorConfig(final Map<String, String> properties) {
this(configDefinition, properties);
Expand Down Expand Up @@ -102,6 +104,10 @@ public LambdaSinkConnectorConfig(final Map<String, String> properties) {

this.awsRegion = this.getString(ConfigurationKeys.AWS_REGION.getValue());

this.failureMode = InvocationFailure.valueOf(
this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE.getValue())
);

}

public Map<String, String> getProperties() {
Expand Down Expand Up @@ -140,6 +146,10 @@ public InvocationMode getInvocationMode() {
return this.invocationMode;
}

public InvocationFailure getFailureMode() {
return this.failureMode;
}

public boolean isBatchingEnabled() {
return this.isBatchingEnabled;
}
Expand Down Expand Up @@ -181,6 +191,10 @@ public static ConfigDef config() {
AWS_LAMBDA_INVOCATION_MODE_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getDocumentation())

.define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE.getValue(), Type.STRING,
AWS_LAMBDA_INVOCATION_FAILURE_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE.getDocumentation())

.define(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue(), Type.BOOLEAN,
AWS_LAMBDA_BATCH_ENABLED_DEFAULT, Importance.MEDIUM,
ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getDocumentation())
Expand Down Expand Up @@ -221,6 +235,10 @@ public enum ConfigurationKeys {
"Determines whether the lambda would be called asynchronously (Event) or Synchronously (Request-Response), possible values are: "
+ Stream.of(InvocationMode.values()).map(InvocationMode::toString)
.collect(Collectors.joining(","))),
AWS_LAMBDA_INVOCATION_FAILURE("aws.lambda.invocation.failure", //TODO Maybe generalize for all failures
"Determines whether the lambda should stop or drop and continue on failure (specifically, payload limit exceeded), possible values are: "
+ Stream.of(InvocationFailure.values()).map(InvocationFailure::toString)
.collect(Collectors.joining(","))),

AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled",
"Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void start(Map<String, String> props) {
this.configuration.getAwsCredentialsProfile(),
this.configuration.getHttpProxyHost(),
this.configuration.getHttpProxyPort(),
this.configuration.getAwsRegion())
this.configuration.getAwsRegion(),
this.configuration.getFailureMode())
);
LOGGER.info("Context for connector {} task {}, Assignments[{}], ",
this.configuration.getConnectorName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.nordstrom.kafka.connect.lambda;

import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import org.junit.Test;

import java.time.Instant;

import static org.junit.Assert.*;

public class AwsLambdaUtilTest {

@Test(expected = RequestTooLargeException.class)
public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() {

final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.STOP);
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration);

testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!"));
}

@Test
public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropContinues() {

AwsLambdaUtil.InvocationResponse testResp = null;
RequestTooLargeException ex = null;

final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.DROP);
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration);

try {
testResp = testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!"));
} catch (RequestTooLargeException e) {
ex = e;
}

assertNull(ex);
assertNotNull(testResp);
assertEquals(400, testResp.getStatusCode().intValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void testPutWhenBatchingIsNotEnabled() {
task.configuration.getAwsCredentialsProfile(),
task.configuration.getHttpProxyHost(),
task.configuration.getHttpProxyPort(),
task.configuration.getAwsRegion())).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now()));
task.configuration.getAwsRegion(),
task.configuration.getFailureMode())).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now()));

Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build();

Expand Down

0 comments on commit 585d2c1

Please sign in to comment.