Skip to content

Commit

Permalink
Remove the aws.credentials.profile configuration property
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanmei committed Aug 5, 2019
1 parent 3c5dd07 commit f930cf3
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
public class Configuration {

private static final int MAX_HTTP_PORT_NUMBER = 65536;
private final Optional<String> credentialsProfile;
private final Optional<String> httpProxyHost;
private final Optional<Integer> httpProxyPort;
private final Optional<String> awsRegion;
Expand All @@ -19,21 +18,13 @@ public class Configuration {
private final Optional<String> externalId;


public Configuration(final String credentialsProfile, final String httpProxyHost,
final Integer httpProxyPort, final String awsRegion,
public Configuration(final String httpProxyHost,
final Integer httpProxyPort,
final String awsRegion,
final InvocationFailure failureMode,
final String roleArn, final String sessionName,
final String roleArn,
final String sessionName,
final String externalId) {
/*
* String awsCredentialsProfile =
* System.getenv(CREDENTIALS_PROFILE_CONFIG_ENV); String awsProxyHost =
* System.getenv(_CONFIG_ENV); String awsProxyPort =
* System.getenv(HTTP_PROXY_PORT_CONFIG_ENV);
*
*/
this.credentialsProfile =
Facility.isNotNullNorEmpty(credentialsProfile) ? Optional.of(credentialsProfile)
: Optional.empty();
this.httpProxyHost =
Facility.isNotNullNorEmpty(httpProxyHost) ? Optional.of(httpProxyHost) : Optional.empty();
this.httpProxyPort = Facility.isNotNullAndInRange(httpProxyPort, 0, MAX_HTTP_PORT_NUMBER)
Expand All @@ -50,16 +41,14 @@ public Configuration(final String credentialsProfile, final String httpProxyHost

}

public Configuration(final Optional<String> awsCredentialsProfile,
final Optional<String> httpProxyHost,
public Configuration(final Optional<String> httpProxyHost,
final Optional<Integer> httpProxyPort,
final Optional<String> awsRegion,
final Optional<InvocationFailure> failureMode,
final Optional<String> roleArn,
final Optional<String> sessionName,
final Optional<String> externalId) {

this.credentialsProfile = awsCredentialsProfile;
this.httpProxyHost = httpProxyHost;
this.httpProxyPort = httpProxyPort;
this.awsRegion = awsRegion;
Expand All @@ -70,17 +59,12 @@ public Configuration(final Optional<String> awsCredentialsProfile,
}

public static Configuration empty() {
return new Configuration(
Optional.empty(), Optional.empty(),
return new Configuration(Optional.empty(),
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty());
}

public Optional<String> getCredentialsProfile() {
return this.credentialsProfile;
}

public Optional<String> getHttpProxyHost() {
return this.httpProxyHost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {

private static final long AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000L;
private static final String AWS_REGION_DEFAULT = "us-west-2";
private static final String AWS_CREDENTIALS_PROFILE_DEFAULT = "";
private static final String HTTP_PROXY_HOST_DEFAULT = "";
private static final int HTTP_PROXY_PORT_DEFAULT = -1;
private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true;
Expand All @@ -45,7 +44,6 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
private final String connectorName;
private final String httpProxyHost;
private final Integer httpProxyPort;
private final String awsCredentialsProfile;
private final String awsFunctionArn;
private final Duration invocationTimeout;
private final InvocationMode invocationMode;
Expand Down Expand Up @@ -80,8 +78,6 @@ public class LambdaSinkConnectorConfig extends AbstractConfig {
this.httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue());
this.httpProxyPort = this.getInt(ConfigurationKeys.HTTP_PROXY_PORT.getValue());

this.awsCredentialsProfile = this.getString(ConfigurationKeys.AWS_CREDENTIALS_PROFILE.getValue());

this.awsFunctionArn = this.getString(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue());
this.invocationTimeout = Duration.ofMillis(this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue()));

Expand Down Expand Up @@ -131,10 +127,6 @@ public Duration getInvocationTimeout() {
return this.invocationTimeout;
}

public String getAwsCredentialsProfile() {
return this.awsCredentialsProfile;
}

public Integer getHttpProxyPort() {
return this.httpProxyPort;
}
Expand Down Expand Up @@ -219,10 +211,6 @@ public static ConfigDef config() {
.define(ConfigurationKeys.AWS_REGION.getValue(), Type.STRING, AWS_REGION_DEFAULT,
Importance.LOW, ConfigurationKeys.AWS_REGION.getDocumentation())

.define(ConfigurationKeys.AWS_CREDENTIALS_PROFILE.getValue(), Type.STRING,
AWS_CREDENTIALS_PROFILE_DEFAULT, Importance.LOW,
ConfigurationKeys.AWS_CREDENTIALS_PROFILE.getDocumentation())

.define(ConfigurationKeys.HTTP_PROXY_HOST.getValue(), Type.STRING, HTTP_PROXY_HOST_DEFAULT,
Importance.LOW, ConfigurationKeys.HTTP_PROXY_HOST.getDocumentation())

Expand Down Expand Up @@ -280,8 +268,6 @@ enum ConfigurationKeys {
"Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " + AWS_LAMBDA_BATCH_ENABLED_DEFAULT),
AWS_REGION("aws.region",
"AWS region to instantiate the Lambda client Default: " + AWS_REGION_DEFAULT),
AWS_CREDENTIALS_PROFILE("aws.credentials.profile",
" AWS credentials profile to use for the Lambda client, by default is empty and will use the DefaultAWSCredentialsProviderChain"),

HTTP_PROXY_HOST("http.proxy.host",
"Http proxy port to be configured for the Lambda client, by default is empty"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void start(final Map<String, String> props) {
this.configuration.getTaskId());

Configuration optConfigs = new Configuration(
this.configuration.getAwsCredentialsProfile(),
this.configuration.getHttpProxyHost(),
this.configuration.getHttpProxyPort(),
this.configuration.getAwsRegion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class AwsLambdaUtilTest {
@Test(expected = RequestTooLargeException.class)
public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() {

final Configuration testOptConfigs = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.STOP, "test-arn", "test-session", "test-external-id");
final Configuration testOptConfigs = new Configuration("testhost", 123, "test-region", InvocationFailure.STOP, "test-arn", "test-session", "test-external-id");
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>());

testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!"));
Expand All @@ -26,7 +26,7 @@ public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropCo
AwsLambdaUtil.InvocationResponse testResp = null;
RequestTooLargeException ex = null;

final Configuration testOptConfigs = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.DROP, "test-arn", "test-session", "test-external-id");
final Configuration testOptConfigs = new Configuration("testhost", 123, "test-region", InvocationFailure.DROP, "test-arn", "test-session", "test-external-id");
final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ public void testStartProperlyInitializesSinkTaskWithSampleConnectorConfiguration
new ImmutableMap.Builder<String, String>()
.put("connector.class", "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector")
.put("tasks.max", "1")
.put("aws.region", "test-region")
.put("aws.lambda.function.arn", "arn:aws:lambda:us-west-2:123456789123:function:test-lambda")
.put("aws.lambda.invocation.timeout.ms", "300000")
.put("aws.lambda.invocation.mode", "SYNC")
.put("aws.lambda.batch.enabled", "true")
.put("aws.lambda.json.wrapper.enabled", "true")
.put("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("value.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("topics", "connect-lambda-test")
.put("aws.credentials.profile", "my-profile")
.build();

LambdaSinkTask task = new LambdaSinkTask();
Expand All @@ -51,12 +50,12 @@ public void testStartProperlyInitializesSinkTaskWithSampleConnectorConfiguration
assertNotNull(task.lambdaClient);

assertEquals("0", task.configuration.getTaskId());
assertEquals("test-region", task.configuration.getAwsRegion());
assertEquals("arn:aws:lambda:us-west-2:123456789123:function:test-lambda", task.configuration.getAwsFunctionArn());
assertEquals("PT5M", task.configuration.getInvocationTimeout().toString());
assertEquals("SYNC", task.configuration.getInvocationMode().toString());
assertEquals(6291455, task.configuration.getMaxBatchSizeBytes());
assertEquals("us-west-2", task.configuration.getAwsRegion());
assertEquals("my-profile", task.configuration.getAwsCredentialsProfile());
assertTrue(task.configuration.isBatchingEnabled());
assertEquals(6291455, task.configuration.getMaxBatchSizeBytes());
}

@Ignore("Test is ignored as a demonstration -- needs profile")
Expand All @@ -71,11 +70,9 @@ public void testPutWhenBatchingIsNotEnabled() {
.put("aws.lambda.invocation.timeout.ms", "300000")
.put("aws.lambda.invocation.mode", "SYNC")
.put("aws.lambda.batch.enabled", "false")
.put("aws.lambda.json.wrapper.enabled", "true")
.put("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("value.converter", "org.apache.kafka.connect.storage.StringConverter")
.put("topics", "connect-lambda-test")
.put("aws.credentials.profile", "my-profile")
.build();

LambdaSinkTask task = new LambdaSinkTask();
Expand All @@ -88,7 +85,6 @@ public void testPutWhenBatchingIsNotEnabled() {
AwsLambdaUtil mockedLambdaClient = mock(AwsLambdaUtil.class);

when(mockedLambdaClient.invoke(anyString(), anyObject(), anyObject(), eq(InvocationType.RequestResponse))).thenReturn(new AwsLambdaUtil( new Configuration(
task.configuration.getAwsCredentialsProfile(),
task.configuration.getHttpProxyHost(),
task.configuration.getHttpProxyPort(),
task.configuration.getAwsRegion(),
Expand Down

0 comments on commit f930cf3

Please sign in to comment.