From 5e89c7cf20ecf434c019ecbb394828ed120564d9 Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 09:35:24 +0200 Subject: [PATCH 1/6] Handle case when invocation succeeds but execution of function fails. --- .../kafka/connect/lambda/LambdaSinkTask.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 0a60daa..c2ab613 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -207,7 +207,14 @@ private void handleResponse( final Collection retriableErrorCodes, final int maxRetries, final long backoffTimeMs) { - if (response.getStatusCode() < 300 && response.getStatusCode() >= 200) { + String functionError = response.getErrorString(); + // When the function execution fails the Lambda responds by setting either of these values + if (functionError.equals("Unhandled") || functionError.equals("Handled")) { + throw new FunctionExecutionException(MessageFormat + .format("Lambda function execution failed. Msg: {0}", + functionError + )); + } else if (response.getStatusCode() < 300 && response.getStatusCode() >= 200) { //success retryCount.set(0); } else { @@ -260,4 +267,9 @@ private class OutOfRetriesException extends RuntimeException { super(message); } } + protected static class FunctionExecutionException extends RuntimeException { + FunctionExecutionException(final String message) { + super(message); + } + } } From edfa2d9c219720e4b0de3f17e823410092ad6803 Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 09:55:56 +0200 Subject: [PATCH 2/6] Print additional information when the Lambda fails. The response payload can contain the exception with exception type, message and stacktrace. --- .../kafka/connect/lambda/InvocationClient.java | 8 ++++---- .../kafka/connect/lambda/InvocationResponse.java | 11 +++++++++++ .../kafka/connect/lambda/LambdaSinkTask.java | 7 +++++-- .../kafka/connect/lambda/LambdaSinkTaskTest.java | 2 +- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java index 14e0944..b20fab8 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java @@ -60,15 +60,15 @@ public InvocationResponse invoke(final byte[] payload) { try { final InvokeResult result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS); return new InvocationResponse(result.getStatusCode(), result.getLogResult(), - result.getFunctionError(), start, Instant.now()); + result.getFunctionError(), result.getPayload(), start, Instant.now()); } catch (RequestTooLargeException e) { return checkPayloadSizeForInvocationType(payload, type, start, e); } catch (final InterruptedException | ExecutionException e) { LOGGER.error(e.getLocalizedMessage(), e); throw new InvocationException(e); } catch (final TimeoutException e) { - return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), start, - Instant.now()); + return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), + null, start, Instant.now()); } } @@ -105,7 +105,7 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final throw e; } // Drop message and continue - return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); + return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), null, start, Instant.now()); } private class InvocationException extends RuntimeException { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java index d4cb7d1..95f75a2 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java @@ -1,9 +1,13 @@ package com.nordstrom.kafka.connect.lambda; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Instant; public class InvocationResponse { private final String errorString; + private final ByteBuffer errorDescription; private final String responseString; private final Integer statusCode; private final Instant start; @@ -13,12 +17,14 @@ public InvocationResponse( final Integer statusCode, final String logResult, final String functionError, + final ByteBuffer errorDescription, final Instant start, final Instant end) { this.statusCode = statusCode; this.responseString = logResult; this.errorString = functionError; + this.errorDescription = errorDescription; this.start = start; this.end = end; @@ -36,6 +42,11 @@ public String getErrorString() { return this.errorString; } + public String getErrorDescription() { + Charset charset = StandardCharsets.UTF_8; + return charset.decode(this.errorDescription).toString(); + } + public String getResponseString() { return this.responseString; } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index c2ab613..09e34d2 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -207,12 +207,14 @@ private void handleResponse( final Collection retriableErrorCodes, final int maxRetries, final long backoffTimeMs) { + String functionError = response.getErrorString(); // When the function execution fails the Lambda responds by setting either of these values if (functionError.equals("Unhandled") || functionError.equals("Handled")) { throw new FunctionExecutionException(MessageFormat - .format("Lambda function execution failed. Msg: {0}", - functionError + .format("Lambda function execution failed. Reason: {0}: {1}", + response.getErrorString(), + response.getErrorDescription() )); } else if (response.getStatusCode() < 300 && response.getStatusCode() >= 200) { //success @@ -267,6 +269,7 @@ private class OutOfRetriesException extends RuntimeException { super(message); } } + protected static class FunctionExecutionException extends RuntimeException { FunctionExecutionException(final String message) { super(message); diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index ceee909..d3c9c0d 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -48,7 +48,7 @@ public void testPutWhenBatchingIsNotEnabled() { InvocationClient mockedClient = mock(InvocationClient.class); when(mockedClient.invoke(any())) - .thenReturn(new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); + .thenReturn(new InvocationResponse(200, "test log", "", null, Instant.now(), Instant.now())); Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build(); From b4c8f7890ba38b815dbdc3d882117da82148ed84 Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 10:00:11 +0200 Subject: [PATCH 3/6] Added two simple testcases related to function execution failure. --- .../kafka/connect/lambda/LambdaSinkTask.java | 2 +- .../connect/lambda/LambdaSinkTaskTest.java | 32 ++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 09e34d2..6d06bf3 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -201,7 +201,7 @@ private InvocationResponse invoke(final String payload) { return response; } - private void handleResponse( + void handleResponse( final InvocationResponse response, final AtomicInteger retryCount, final Collection retriableErrorCodes, diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index d3c9c0d..1cfb2da 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -9,15 +9,19 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; -import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -58,4 +62,30 @@ public void testPutWhenBatchingIsNotEnabled() { task.put(testList); } + + @Test(expected = LambdaSinkTask.FunctionExecutionException.class) + public void testHandleResponseWhenFunctionExecutionFails() { + + LambdaSinkTask task = new LambdaSinkTask(); + + String msg = "{\"errorMessage\": \"foo\", \"errorType\": \"ValueError\", \"stackTrace\": [\" File \\\"/var/task/lambda_function.py\\\", line 5, in lambda_handler\\n raise ValueError(\\\"foo\\\")\\n\"]}"; + ByteBuffer errorDescription = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); + InvocationResponse invocationResponse = new InvocationResponse(200, "test log", "Unhandled", errorDescription, Instant.now(), Instant.now()); + + task.handleResponse(invocationResponse, new AtomicInteger(0), Arrays.asList(501,504), 3, 1L); + + } + + @Test + public void testHandleResponseWhenFunctionInvocationAndExecutionSucceeds() { + + LambdaSinkTask task = new LambdaSinkTask(); + InvocationResponse invocationResponse = new InvocationResponse(200, "test log", "", null, Instant.now(), Instant.now()); + AtomicInteger retryCounter = new AtomicInteger(2); + + task.handleResponse(invocationResponse, retryCounter, Arrays.asList(501,504), 3, 1L); + + Assert.assertEquals(0, retryCounter.intValue()); + } + } From 0e238b5ebdbe12cb6b777de7fc9398f0aceb36cc Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 11:00:12 +0200 Subject: [PATCH 4/6] Simplifying. Upon function error the 'FunctionError' field is non-empty according to https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html#API_Invoke_ResponseSyntax --- .../java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 6d06bf3..a21da80 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -210,7 +210,7 @@ void handleResponse( String functionError = response.getErrorString(); // When the function execution fails the Lambda responds by setting either of these values - if (functionError.equals("Unhandled") || functionError.equals("Handled")) { + if (! functionError.isEmpty()) { throw new FunctionExecutionException(MessageFormat .format("Lambda function execution failed. Reason: {0}: {1}", response.getErrorString(), From 705ad3f44797f867a7270ac5e2fb9c63e0a2feee Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 11:03:37 +0200 Subject: [PATCH 5/6] Fixed comment --- .../java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index a21da80..02758f3 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -209,8 +209,8 @@ void handleResponse( final long backoffTimeMs) { String functionError = response.getErrorString(); - // When the function execution fails the Lambda responds by setting either of these values if (! functionError.isEmpty()) { + //function-error throw new FunctionExecutionException(MessageFormat .format("Lambda function execution failed. Reason: {0}: {1}", response.getErrorString(), From 9d7cb68ecd45e986194571a663e34c439269fd49 Mon Sep 17 00:00:00 2001 From: "rickard.cardell" Date: Fri, 29 May 2020 11:36:08 +0200 Subject: [PATCH 6/6] 'FunctionError' field is not set in the response, i.e it is either null or empty string --- .../java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java | 2 +- .../com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 02758f3..08d3a89 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -209,7 +209,7 @@ void handleResponse( final long backoffTimeMs) { String functionError = response.getErrorString(); - if (! functionError.isEmpty()) { + if (functionError != null && !functionError.isEmpty()) { //function-error throw new FunctionExecutionException(MessageFormat .format("Lambda function execution failed. Reason: {0}: {1}", diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index 1cfb2da..304ccfa 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -80,7 +80,7 @@ public void testHandleResponseWhenFunctionExecutionFails() { public void testHandleResponseWhenFunctionInvocationAndExecutionSucceeds() { LambdaSinkTask task = new LambdaSinkTask(); - InvocationResponse invocationResponse = new InvocationResponse(200, "test log", "", null, Instant.now(), Instant.now()); + InvocationResponse invocationResponse = new InvocationResponse(200, "test log", null, null, Instant.now(), Instant.now()); AtomicInteger retryCounter = new AtomicInteger(2); task.handleResponse(invocationResponse, retryCounter, Arrays.asList(501,504), 3, 1L);