diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java index 14e0944..b20fab8 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java @@ -60,15 +60,15 @@ public InvocationResponse invoke(final byte[] payload) { try { final InvokeResult result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS); return new InvocationResponse(result.getStatusCode(), result.getLogResult(), - result.getFunctionError(), start, Instant.now()); + result.getFunctionError(), result.getPayload(), start, Instant.now()); } catch (RequestTooLargeException e) { return checkPayloadSizeForInvocationType(payload, type, start, e); } catch (final InterruptedException | ExecutionException e) { LOGGER.error(e.getLocalizedMessage(), e); throw new InvocationException(e); } catch (final TimeoutException e) { - return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), start, - Instant.now()); + return new InvocationResponse(504, e.getLocalizedMessage(), e.getLocalizedMessage(), + null, start, Instant.now()); } } @@ -105,7 +105,7 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final throw e; } // Drop message and continue - return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); + return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), null, start, Instant.now()); } private class InvocationException extends RuntimeException { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java index d4cb7d1..95f75a2 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationResponse.java @@ -1,9 +1,13 @@ package com.nordstrom.kafka.connect.lambda; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Instant; public class InvocationResponse { private final String errorString; + private final ByteBuffer errorDescription; private final String responseString; private final Integer statusCode; private final Instant start; @@ -13,12 +17,14 @@ public InvocationResponse( final Integer statusCode, final String logResult, final String functionError, + final ByteBuffer errorDescription, final Instant start, final Instant end) { this.statusCode = statusCode; this.responseString = logResult; this.errorString = functionError; + this.errorDescription = errorDescription; this.start = start; this.end = end; @@ -36,6 +42,11 @@ public String getErrorString() { return this.errorString; } + public String getErrorDescription() { + Charset charset = StandardCharsets.UTF_8; + return charset.decode(this.errorDescription).toString(); + } + public String getResponseString() { return this.responseString; } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 0a60daa..08d3a89 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -201,13 +201,22 @@ private InvocationResponse invoke(final String payload) { return response; } - private void handleResponse( + void handleResponse( final InvocationResponse response, final AtomicInteger retryCount, final Collection retriableErrorCodes, final int maxRetries, final long backoffTimeMs) { - if (response.getStatusCode() < 300 && response.getStatusCode() >= 200) { + + String functionError = response.getErrorString(); + if (functionError != null && !functionError.isEmpty()) { + //function-error + throw new FunctionExecutionException(MessageFormat + .format("Lambda function execution failed. Reason: {0}: {1}", + response.getErrorString(), + response.getErrorDescription() + )); + } else if (response.getStatusCode() < 300 && response.getStatusCode() >= 200) { //success retryCount.set(0); } else { @@ -260,4 +269,10 @@ private class OutOfRetriesException extends RuntimeException { super(message); } } + + protected static class FunctionExecutionException extends RuntimeException { + FunctionExecutionException(final String message) { + super(message); + } + } } diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index ceee909..304ccfa 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -9,15 +9,19 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; -import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -48,7 +52,7 @@ public void testPutWhenBatchingIsNotEnabled() { InvocationClient mockedClient = mock(InvocationClient.class); when(mockedClient.invoke(any())) - .thenReturn(new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); + .thenReturn(new InvocationResponse(200, "test log", "", null, Instant.now(), Instant.now())); Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build(); @@ -58,4 +62,30 @@ public void testPutWhenBatchingIsNotEnabled() { task.put(testList); } + + @Test(expected = LambdaSinkTask.FunctionExecutionException.class) + public void testHandleResponseWhenFunctionExecutionFails() { + + LambdaSinkTask task = new LambdaSinkTask(); + + String msg = "{\"errorMessage\": \"foo\", \"errorType\": \"ValueError\", \"stackTrace\": [\" File \\\"/var/task/lambda_function.py\\\", line 5, in lambda_handler\\n raise ValueError(\\\"foo\\\")\\n\"]}"; + ByteBuffer errorDescription = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); + InvocationResponse invocationResponse = new InvocationResponse(200, "test log", "Unhandled", errorDescription, Instant.now(), Instant.now()); + + task.handleResponse(invocationResponse, new AtomicInteger(0), Arrays.asList(501,504), 3, 1L); + + } + + @Test + public void testHandleResponseWhenFunctionInvocationAndExecutionSucceeds() { + + LambdaSinkTask task = new LambdaSinkTask(); + InvocationResponse invocationResponse = new InvocationResponse(200, "test log", null, null, Instant.now(), Instant.now()); + AtomicInteger retryCounter = new AtomicInteger(2); + + task.handleResponse(invocationResponse, retryCounter, Arrays.asList(501,504), 3, 1L); + + Assert.assertEquals(0, retryCounter.intValue()); + } + }