From 4192785e32e00ac770389779dfe50927fdf3730e Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 18 Jan 2024 16:55:58 +0800 Subject: [PATCH] refactor(s3stream): create `S3Operator` with credential providers (#898) Signed-off-by: Ning Yu --- pom.xml | 2 +- s3stream/pom.xml | 2 +- .../java/com/automq/stream/s3/Config.java | 20 ------- .../stream/s3/operator/DefaultS3Operator.java | 57 +++++++++---------- .../java/com/automq/stream/utils/S3Utils.java | 57 +++++++------------ .../rocketmq/store/MessageStoreBuilder.java | 4 +- .../automq/rocketmq/store/S3StreamStore.java | 9 +-- 7 files changed, 58 insertions(+), 93 deletions(-) diff --git a/pom.xml b/pom.xml index 735d0f1c3..977fc50d6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.1.3-jre 2.0.9 2.2 - 0.17.0-SNAPSHOT + 0.18.0-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index d43abdc41..b6518b2e4 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.17.0-SNAPSHOT + 0.18.0-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index e0ec033d6..79750f404 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -24,8 +24,6 @@ public class Config { private String region; private String bucket; private boolean forcePathStyle = false; - private String accessKey; - private String secretKey; private String walPath = "/tmp/s3stream_wal"; private long walCacheSize = 200 * 1024 * 1024; private long walCapacity = 1024L * 1024 * 1024; @@ -207,14 +205,6 @@ public boolean objectLogEnable() { return objectLogEnable; } - public String accessKey() { - return accessKey; - } - - public String secretKey() { - return secretKey; - } - public long networkBaselineBandwidth() { return networkBaselineBandwidth; } @@ -403,16 +393,6 @@ public Config objectLogEnable(boolean s3ObjectLogEnable) { return this; } - public Config accessKey(String s3AccessKey) { - this.accessKey = s3AccessKey; - return this; - } - - public Config secretKey(String s3SecretKey) { - this.secretKey = s3SecretKey; - return this; - } - public Config networkBaselineBandwidth(long networkBaselineBandwidth) { this.networkBaselineBandwidth = networkBaselineBandwidth; return this; diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index f0f2961a8..3c489fc2d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -60,7 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -113,20 +113,20 @@ public class DefaultS3Operator implements S3Operator { private final HashedWheelTimer timeoutDetect = new HashedWheelTimer( ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100); - public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, - String secretKey) { - this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false); + public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, + List credentialsProviders) { + this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false); } - public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey, - String secretKey, + public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, + List credentialsProviders, AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) { this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate(); this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter; this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter; - this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey); - this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client; + this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders); + this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders) : writeS3Client; this.inflightWriteLimiter = new Semaphore(50); this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter; this.bucket = bucket; @@ -137,8 +137,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean .setRegion(region) .setBucketName(bucket) .setForcePathStyle(forcePathStyle) - .setAccessKey(accessKey) - .setSecretKey(secretKey) + .setCredentialsProviders(credentialsProviders) .build(); LOGGER.info("You are using s3Context: {}", s3Context); checkAvailable(s3Context); @@ -649,26 +648,30 @@ private void checkAvailable(S3Utils.S3Context s3Context) { } } - public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, - String secretKey) { + public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, + List credentialsProviders) { S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); if (StringUtils.isNotBlank(endpoint)) { builder.endpointOverride(URI.create(endpoint)); } builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle)); - builder.credentialsProvider(AwsCredentialsProviderChain.builder() - .reuseLastProviderEnabled(true) - .credentialsProviders( - () -> AwsBasicCredentials.create(accessKey, secretKey), - InstanceProfileCredentialsProvider.create(), - AnonymousCredentialsProvider.create() - ).build() - ); + builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders)); builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1)) .apiCallAttemptTimeout(Duration.ofSeconds(30))); return builder.build(); } + private AwsCredentialsProvider newCredentialsProviderChain(List credentialsProviders) { + List providers = new ArrayList<>(credentialsProviders); + // Add default providers to the end of the chain + providers.add(InstanceProfileCredentialsProvider.create()); + providers.add(AnonymousCredentialsProvider.create()); + return AwsCredentialsProviderChain.builder() + .reuseLastProviderEnabled(true) + .credentialsProviders(providers) + .build(); + } + /** * Acquire read permit, permit will auto release when cf complete. * @@ -813,8 +816,7 @@ public static class Builder { private String region; private String bucket; private boolean forcePathStyle; - private String accessKey; - private String secretKey; + private List credentialsProviders; private AsyncNetworkBandwidthLimiter inboundLimiter; private AsyncNetworkBandwidthLimiter outboundLimiter; private boolean readWriteIsolate; @@ -839,13 +841,8 @@ public Builder forcePathStyle(boolean forcePathStyle) { return this; } - public Builder accessKey(String accessKey) { - this.accessKey = accessKey; - return this; - } - - public Builder secretKey(String secretKey) { - this.secretKey = secretKey; + public Builder credentialsProviders(List credentialsProviders) { + this.credentialsProviders = credentialsProviders; return this; } @@ -865,7 +862,7 @@ public Builder readWriteIsolate(boolean readWriteIsolate) { } public DefaultS3Operator build() { - return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, + return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders, inboundLimiter, outboundLimiter, readWriteIsolate); } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java index 4004c4605..d835ea18b 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java @@ -34,10 +34,11 @@ import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; @@ -84,35 +85,25 @@ private static String range(long start, long end) { } private static S3AsyncClient newS3AsyncClient(String endpoint, String region, boolean forcePathStyle, - String accessKey, String secretKey) { + List credentialsProviders) { S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); if (StringUtils.isNotBlank(endpoint)) { builder.endpointOverride(URI.create(endpoint)); } builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle)); - builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); + builder.credentialsProvider(AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build()); builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1)) .apiCallAttemptTimeout(Duration.ofSeconds(30))); return builder.build(); } - private static String hideSecret(String secret) { - if (secret == null) { - return null; - } - if (secret.length() < 6) { - return "*".repeat(secret.length()); - } - return secret.substring(0, 3) + "*".repeat(secret.length() - 6) + secret.substring(secret.length() - 3); - } - private static abstract class S3CheckTask implements AutoCloseable { protected final S3AsyncClient client; protected final String bucketName; private final String taskName; public S3CheckTask(S3Context context, String taskName) { - this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.accessKey, context.secretKey); + this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.credentialsProviders); this.bucketName = context.bucketName; this.taskName = taskName; } @@ -363,17 +354,16 @@ public void close() { public static class S3Context { private final String endpoint; - private final String accessKey; - private final String secretKey; + private final List credentialsProviders; private final String bucketName; private final String region; private final boolean forcePathStyle; - public S3Context(String endpoint, String accessKey, String secretKey, String bucketName, String region, + public S3Context(String endpoint, List credentialsProviders, String bucketName, + String region, boolean forcePathStyle) { this.endpoint = endpoint; - this.accessKey = accessKey; - this.secretKey = secretKey; + this.credentialsProviders = credentialsProviders; this.bucketName = bucketName; this.region = region; this.forcePathStyle = forcePathStyle; @@ -406,11 +396,13 @@ public List advices() { } } } - if (StringUtils.isBlank(accessKey)) { - advises.add("accessKey is blank. Please supply a valid accessKey."); + if (credentialsProviders == null || credentialsProviders.isEmpty()) { + advises.add("no credentials provider is supplied. Please supply a credentials provider."); } - if (StringUtils.isBlank(secretKey)) { - advises.add("secretKey is blank. Please supply a valid secretKey."); + try (AwsCredentialsProviderChain chain = AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build()) { + chain.resolveCredentials(); + } catch (SdkClientException e) { + advises.add("all provided credentials providers are invalid. Please supply a valid credentials provider. Error msg: " + e.getMessage()); } if (StringUtils.isBlank(region)) { advises.add("region is blank. Please supply a valid region."); @@ -425,8 +417,7 @@ public List advices() { public String toString() { return "S3CheckContext{" + "endpoint='" + endpoint + '\'' + - ", accessKey='" + hideSecret(accessKey) + '\'' + - ", secretKey='" + hideSecret(secretKey) + '\'' + + ", credentialsProviders=" + credentialsProviders + ", bucketName='" + bucketName + '\'' + ", region='" + region + '\'' + ", forcePathStyle=" + forcePathStyle + @@ -435,8 +426,7 @@ public String toString() { public static class Builder { private String endpoint; - private String accessKey; - private String secretKey; + private List credentialsProviders; private String bucketName; private String region; private boolean forcePathStyle; @@ -446,13 +436,8 @@ public Builder setEndpoint(String endpoint) { return this; } - public Builder setAccessKey(String accessKey) { - this.accessKey = accessKey; - return this; - } - - public Builder setSecretKey(String secretKey) { - this.secretKey = secretKey; + public Builder setCredentialsProviders(List credentialsProviders) { + this.credentialsProviders = credentialsProviders; return this; } @@ -472,7 +457,7 @@ public Builder setForcePathStyle(boolean forcePathStyle) { } public S3Context build() { - return new S3Context(endpoint, accessKey, secretKey, bucketName, region, forcePathStyle); + return new S3Context(endpoint, credentialsProviders, bucketName, region, forcePathStyle); } } diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java b/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java index 1e9833f2d..b97c17d6b 100644 --- a/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java +++ b/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java @@ -40,6 +40,8 @@ import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.operator.DefaultS3Operator; import com.automq.stream.s3.operator.S3Operator; +import java.util.List; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT; @@ -64,7 +66,7 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S // S3 object manager, such as trim expired messages, etc. S3Operator operator = new DefaultS3Operator(s3StreamConfig.s3Endpoint(), s3StreamConfig.s3Region(), s3StreamConfig.s3Bucket(), - s3StreamConfig.s3ForcePathStyle(), s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey()); + s3StreamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey()))); S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator); TransactionService transactionService = new TransactionService(storeConfig, timerService); diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java index fe29a751b..bb050ab33 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java @@ -58,6 +58,7 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; public class S3StreamStore implements StreamStore { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class); @@ -92,7 +93,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store } S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), - streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true); + streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())), + networkInboundLimiter, networkOutboundLimiter, true); WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build(); S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator); @@ -102,7 +104,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store // Build the compaction manager S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), - streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true); + streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())), + networkInboundLimiter, networkOutboundLimiter, true); this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator); this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter); @@ -255,8 +258,6 @@ private Config configFrom(S3StreamConfig streamConfig) { config.bucket(streamConfig.s3Bucket()); config.forcePathStyle(streamConfig.s3ForcePathStyle()); config.walPath(streamConfig.s3WALPath()); - config.accessKey(streamConfig.s3AccessKey()); - config.secretKey(streamConfig.s3SecretKey()); config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth()); config.refillPeriodMs(streamConfig.refillPeriodMs());