From c3c11a0f03200e89aeaae23ac80502de329219cc Mon Sep 17 00:00:00 2001 From: Mike Luu Date: Sat, 28 Oct 2023 16:13:25 -0700 Subject: [PATCH] add region option to AwsAssumeRoleCredentialsProvider This adds a `region` configuration item for the AwsAssumeRoleCredentialsProvider. It is used when building a AWSSecurityTokenServiceClient. It not specified, uses the default region selector. Fixes #366 --- .../AwsAssumeRoleCredentialsProvider.java | 43 +++++++++++-------- .../connect/s3/S3SinkConnectorConfigTest.java | 4 ++ 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 880a1045b..0ade1dbf5 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -42,6 +42,7 @@ public class AwsAssumeRoleCredentialsProvider implements AWSCredentialsProvider, public static final String ROLE_EXTERNAL_ID_CONFIG = "sts.role.external.id"; public static final String ROLE_ARN_CONFIG = "sts.role.arn"; public static final String ROLE_SESSION_NAME_CONFIG = "sts.role.session.name"; + public static final String STS_REGION_CONFIG = "sts.region"; private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() .define( @@ -59,11 +60,18 @@ public class AwsAssumeRoleCredentialsProvider implements AWSCredentialsProvider, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Role session name to use when starting a session" + ).define( + STS_REGION_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.MEDIUM, + "Region of STS service. If not specified, uses a default region selector." ); private String roleArn; private String roleExternalId; private String roleSessionName; + private String region; private BasicAWSCredentials basicCredentials; @@ -77,28 +85,29 @@ public void configure(Map configs) { roleArn = config.getString(ROLE_ARN_CONFIG); roleExternalId = config.getString(ROLE_EXTERNAL_ID_CONFIG); roleSessionName = config.getString(ROLE_SESSION_NAME_CONFIG); + region = config.getString(STS_REGION_CONFIG); final String accessKeyId = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG); final String secretKey = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG); + + // default sts client will internally use default credentials chain provider + AWSSecurityTokenServiceClientBuilder stsClientBuilder = AWSSecurityTokenServiceClientBuilder + .standard(); + if (StringUtils.isNotBlank(region)) { + stsClientBuilder = stsClientBuilder.withRegion(region); + } + + // Use explicit access key and secret if set if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder - .standard() - .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() - ) - .withExternalId(roleExternalId) - .build(); - } else { - basicCredentials = null; - stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - // default sts client will internally use default credentials chain provider - // https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build(); + stsClientBuilder = stsClientBuilder + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)); } + + stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(stsClientBuilder.build()) + .withExternalId(roleExternalId) + .build(); } @Override diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index acf1b8dbe..939beee5c 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -255,6 +255,10 @@ public void testConfigurableAwsAssumeRoleCredentialsProvider() { configPrefix.concat(AwsAssumeRoleCredentialsProvider.ROLE_EXTERNAL_ID_CONFIG), "my-external-id" ); + properties.put( + configPrefix.concat(AwsAssumeRoleCredentialsProvider.STS_REGION_CONFIG), + "us-west-2" + ); connectorConfig = new S3SinkConnectorConfig(properties); AwsAssumeRoleCredentialsProvider credentialsProvider =