From e342e4bf0d95b7cbf6f3ac52be39ae7574b19bd6 Mon Sep 17 00:00:00 2001 From: Mike Luu Date: Sat, 28 Oct 2023 16:13:25 -0700 Subject: [PATCH] Pass region into credentials provider This passes the `s3.region` configuration value into credential providers. It also updates the AwsAssumeRoleCredentialsProvider to specify a region when building a AWSSecurityTokenServiceClient. Fixes #366 --- .../connect/s3/S3SinkConnectorConfig.java | 1 + .../AwsAssumeRoleCredentialsProvider.java | 34 +++++++++---------- .../connect/s3/S3SinkConnectorConfigTest.java | 3 +- .../connect/s3/storage/S3StorageTest.java | 2 +- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index a537e73f4..9e5d826ba 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -896,6 +896,7 @@ public AWSCredentialsProvider getCredentialsProvider() { configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + configs.put(REGION_CONFIG, getString(REGION_CONFIG)); ((Configurable) provider).configure(configs); } else { diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 880a1045b..ab6d85b01 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -31,6 +31,7 @@ import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.REGION_CONFIG; /** * AWS credentials provider that uses the AWS Security Token Service to assume a Role and create a @@ -79,26 +80,25 @@ public void configure(Map configs) { roleSessionName = config.getString(ROLE_SESSION_NAME_CONFIG); final String accessKeyId = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG); final String secretKey = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG); + final String region = (String) configs.get(REGION_CONFIG); + + // default sts client will internally use default credentials chain provider + AWSSecurityTokenServiceClientBuilder stsClientBuilder = AWSSecurityTokenServiceClientBuilder + .standard() + .withRegion(region); + + // Use explicit access key and secret if set if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder - .standard() - .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() - ) - .withExternalId(roleExternalId) - .build(); - } else { - basicCredentials = null; - stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - // default sts client will internally use default credentials chain provider - // https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build(); + stsClientBuilder = stsClientBuilder + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)); } + + stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(stsClientBuilder.build()) + .withExternalId(roleExternalId) + .build(); } @Override diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index acf1b8dbe..34f6f90d5 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -226,7 +226,7 @@ public void testConfigurableCredentialProvider() { ); properties.put( configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), - "5" + "6" ); connectorConfig = new S3SinkConnectorConfig(properties); @@ -255,6 +255,7 @@ public void testConfigurableAwsAssumeRoleCredentialsProvider() { configPrefix.concat(AwsAssumeRoleCredentialsProvider.ROLE_EXTERNAL_ID_CONFIG), "my-external-id" ); + properties.put(S3SinkConnectorConfig.REGION_CONFIG, "us-west-2"); connectorConfig = new S3SinkConnectorConfig(properties); AwsAssumeRoleCredentialsProvider credentialsProvider = diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java index 31f947967..a9e871d4e 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java @@ -142,7 +142,7 @@ public void testUserDefinedCredentialsProvider() throws Exception { String configPrefix = S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.ACCESS_KEY_NAME), "foo_key"); localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.SECRET_KEY_NAME), "bar_secret"); - localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), "5"); + localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), "6"); localProps.put( S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, DummyAssertiveCredentialsProvider.class.getName()