Skip to content

Commit

Permalink
Introduce plain invocation payload formatter class
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmei committed Aug 7, 2019
1 parent 1f773cf commit 292c297
Show file tree
Hide file tree
Showing 16 changed files with 771 additions and 601 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.nordstrom.kafka.connect.formatters;

import java.util.Collection;
import org.apache.kafka.connect.sink.SinkRecord;

public interface PayloadFormatter {
String format(final SinkRecord record) throws PayloadFormattingException;
String formatBatch(final Collection<SinkRecord> records) throws PayloadFormattingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.nordstrom.kafka.connect.formatters;

public class PayloadFormattingException extends RuntimeException {
public PayloadFormattingException (final Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.nordstrom.kafka.connect.formatters;

import org.apache.kafka.connect.sink.SinkRecord;

public class PlainPayload {
private String key;
private String keySchemaName;
private String value;
private String valueSchemaName;
private String topic;
private int partition;
private long offset;
private long timestamp;
private String timestampTypeName;

protected PlainPayload() {
}

public PlainPayload(final SinkRecord record) {
this.key = record.key() == null ? "" : record.key().toString();
if (record.keySchema() != null)
this.keySchemaName = record.keySchema().name();

this.value = record.value() == null ? "" : record.value().toString();
if (record.valueSchema() != null)
this.valueSchemaName = record.valueSchema().name();

this.topic = record.topic();
this.partition = record.kafkaPartition();
this.offset = record.kafkaOffset();

if (record.timestamp() != null)
this.timestamp = record.timestamp();
if (record.timestampType() != null)
this.timestampTypeName = record.timestampType().name;
}

public String getValue() { return this.value; }
public void setValue(final String value) { this.value = value; }

public long getOffset() { return this.offset; }
public void setOffset(final long offset) { this.offset = offset; }

public Long getTimestamp() { return this.timestamp; }
public void setTimestamp(final long timestamp) { this.timestamp = timestamp; }

public String getTimestampTypeName() { return this.timestampTypeName; }
public void setTimestampTypeName(final String timestampTypeName) { this.timestampTypeName = timestampTypeName; }

public int getPartition() { return this.partition; }
public void setPartition(final int partition) { this.partition = partition; }

public String getKey() { return this.key; }
public void setKey(final String key) { this.key = key; }

public String getKeySchemaName() { return this.keySchemaName; }
public void setKeySchemaName(final String keySchemaName) { this.keySchemaName = keySchemaName; }

public String getValueSchemaName() { return this.valueSchemaName; }
public void setValueSchemaName(final String valueSchemaName) { this.valueSchemaName = valueSchemaName; }

public String getTopic() { return this.topic; }
public void setTopic(final String topic) { this.topic = topic; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.nordstrom.kafka.connect.formatters;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlainPayloadFormatter implements PayloadFormatter {
private static final Logger LOGGER = LoggerFactory.getLogger(PlainPayloadFormatter.class);
private final ObjectWriter recordWriter = new ObjectMapper().writerFor(PlainPayload.class);
private final ObjectWriter recordsWriter = new ObjectMapper().writerFor(PlainPayload[].class);

public String format(final SinkRecord record) {
PlainPayload payload = new PlainPayload(record);

try {
return this.recordWriter.writeValueAsString(payload);
} catch (final JsonProcessingException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new PayloadFormattingException(e);
}
}

public String formatBatch(final Collection<SinkRecord> records) {
final PlainPayload[] payloads = records
.stream()
.map(record -> new PlainPayload(record))
.toArray(PlainPayload[]::new);

try {
return this.recordsWriter.writeValueAsString(payloads);
} catch (final JsonProcessingException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new PayloadFormattingException(e);
}
}
}
84 changes: 11 additions & 73 deletions src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,18 @@
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import com.nordstrom.kafka.connect.utils.Guard;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AwsLambdaUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaUtil.class);

private static final int MEGABYTE_SIZE = 1024 * 1024;
Expand All @@ -36,44 +32,21 @@ public class AwsLambdaUtil {
private final AWSLambdaAsync lambdaClient;
private final InvocationFailure failureMode;

public AwsLambdaUtil(final Configuration optConfigs, final Map<String, ?> bareAssumeRoleConfigs) {
LOGGER.debug("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs);
Guard.verifyNotNull(optConfigs, "optConfigs");

final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard();

// Will check if there's proxy configuration in the environment; if
// there's any will construct the client with it.
if (optConfigs.getHttpProxyHost().isPresent()) {
final ClientConfiguration clientConfiguration = new ClientConfiguration()
.withProxyHost(optConfigs.getHttpProxyHost().get());
if (optConfigs.getHttpProxyPort().isPresent()) {
clientConfiguration.setProxyPort(optConfigs.getHttpProxyPort().get());
}
builder.setClientConfiguration(clientConfiguration);
LOGGER.info("Setting proxy configuration for AWS Lambda Async client host: {} port {}",
optConfigs.getHttpProxyHost().get(), optConfigs.getHttpProxyPort().orElse(-1));
}
public AwsLambdaUtil(ClientConfiguration clientConfiguration,
AWSCredentialsProvider credentialsProvider,
InvocationFailure failureMode) {

if (optConfigs.getAwsRegion().isPresent()) {
builder.setRegion(optConfigs.getAwsRegion().get());
LOGGER.info("Using aws region: {}", optConfigs.getAwsRegion().toString());
}
Guard.verifyNotNull(clientConfiguration, "clientConfiguration");
Guard.verifyNotNull(credentialsProvider, "credentialsProvider");

failureMode = optConfigs.getFailureMode().orElse(InvocationFailure.STOP);

AWSCredentialsProvider provider = null;
try {
provider = getCredentialsProvider(bareAssumeRoleConfigs);
} catch (Exception e) {
LOGGER.error("Problem initializing provider", e);
}
if (provider != null) {
builder.setCredentials(provider);
}
final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider);

this.failureMode = failureMode;
this.lambdaClient = builder.build();
LOGGER.info("AWS Lambda client initialized");

LOGGER.debug("AWS Lambda client initialized");
}

public InvocationResponse invokeSync(
Expand Down Expand Up @@ -155,41 +128,6 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final
return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now());
}

@SuppressWarnings("unchecked")
public AWSCredentialsProvider getCredentialsProvider(Map<String, ?> roleConfigs) {
LOGGER.info(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs);

try {
Object providerField = roleConfigs.get("class");
String providerClass = LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue();
if (providerField != null) {
providerClass = providerField.toString();
}
LOGGER.debug(".get-credentials-provider:field={}, class={}", providerField, providerClass);
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(providerClass)).newInstance();

if (provider instanceof Configurable) {
((Configurable) provider).configure(roleConfigs);
}

LOGGER.debug(".get-credentials-provider:provider={}", provider);
return provider;
} catch (IllegalAccessException | InstantiationException e) {
throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
}
}

public Class<?> getClass(String className) {
LOGGER.debug(".get-class:class={}",className);
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
LOGGER.error("Provider class not found: {}", e);
}
return null;
}

private class LambdaInvocationException extends RuntimeException {
public LambdaInvocationException(final Throwable e) {
super(e);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,27 @@ public class LambdaSinkConnector extends SinkConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.range(0, maxTasks)
.mapToObj(i -> {
final Map<String, String> taskProperties = new HashMap<>(
this.configuration.getProperties());
return taskProperties;
})
.collect(Collectors.toList());
.mapToObj(i -> {
return new HashMap<>(this.configuration.originalsStrings());
})
.collect(Collectors.toList());
}

@Override
public void start(Map<String, String> settings) {
LOGGER.info("starting connector {}",
settings.getOrDefault(LambdaSinkConnectorConfig.ConfigurationKeys.NAME_CONFIG.getValue(), ""));

this.configuration = new LambdaSinkConnectorConfig(settings);

LOGGER.info("connector.start:OK");
LOGGER.info("Starting connector {}", this.configuration.getConnectorName());
}

@Override
public void stop() {
LOGGER.info("connector.stop:OK");
LOGGER.info("Stopping connector {}", this.configuration.getConnectorName());
}

@Override
public ConfigDef config() {
return LambdaSinkConnectorConfig.config();
return LambdaSinkConnectorConfig.configDef();
}

@Override
Expand Down
Loading

0 comments on commit 292c297

Please sign in to comment.