Skip to content

Commit

Permalink
refactor(s3stream): create S3Operator with credential providers (#898)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Jan 18, 2024
1 parent cf07ae3 commit 4192785
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.17.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.18.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.17.0-SNAPSHOT</version>
<version>0.18.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
20 changes: 0 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public class Config {
private String region;
private String bucket;
private boolean forcePathStyle = false;
private String accessKey;
private String secretKey;
private String walPath = "/tmp/s3stream_wal";
private long walCacheSize = 200 * 1024 * 1024;
private long walCapacity = 1024L * 1024 * 1024;
Expand Down Expand Up @@ -207,14 +205,6 @@ public boolean objectLogEnable() {
return objectLogEnable;
}

public String accessKey() {
return accessKey;
}

public String secretKey() {
return secretKey;
}

public long networkBaselineBandwidth() {
return networkBaselineBandwidth;
}
Expand Down Expand Up @@ -403,16 +393,6 @@ public Config objectLogEnable(boolean s3ObjectLogEnable) {
return this;
}

public Config accessKey(String s3AccessKey) {
this.accessKey = s3AccessKey;
return this;
}

public Config secretKey(String s3SecretKey) {
this.secretKey = s3SecretKey;
return this;
}

public Config networkBaselineBandwidth(long networkBaselineBandwidth) {
this.networkBaselineBandwidth = networkBaselineBandwidth;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -113,20 +113,20 @@ public class DefaultS3Operator implements S3Operator {
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
String secretKey) {
this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
String secretKey,
public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders);
this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders) : writeS3Client;
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
Expand All @@ -137,8 +137,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
.setRegion(region)
.setBucketName(bucket)
.setForcePathStyle(forcePathStyle)
.setAccessKey(accessKey)
.setSecretKey(secretKey)
.setCredentialsProviders(credentialsProviders)
.build();
LOGGER.info("You are using s3Context: {}", s3Context);
checkAvailable(s3Context);
Expand Down Expand Up @@ -649,26 +648,30 @@ private void checkAvailable(S3Utils.S3Context s3Context) {
}
}

public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey,
String secretKey) {
public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(
() -> AwsBasicCredentials.create(accessKey, secretKey),
InstanceProfileCredentialsProvider.create(),
AnonymousCredentialsProvider.create()
).build()
);
builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders));
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}

private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
// Add default providers to the end of the chain
providers.add(InstanceProfileCredentialsProvider.create());
providers.add(AnonymousCredentialsProvider.create());
return AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(providers)
.build();
}

/**
* Acquire read permit, permit will auto release when cf complete.
*
Expand Down Expand Up @@ -813,8 +816,7 @@ public static class Builder {
private String region;
private String bucket;
private boolean forcePathStyle;
private String accessKey;
private String secretKey;
private List<AwsCredentialsProvider> credentialsProviders;
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
Expand All @@ -839,13 +841,8 @@ public Builder forcePathStyle(boolean forcePathStyle) {
return this;
}

public Builder accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public Builder secretKey(String secretKey) {
this.secretKey = secretKey;
public Builder credentialsProviders(List<AwsCredentialsProvider> credentialsProviders) {
this.credentialsProviders = credentialsProviders;
return this;
}

Expand All @@ -865,7 +862,7 @@ public Builder readWriteIsolate(boolean readWriteIsolate) {
}

public DefaultS3Operator build() {
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey,
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders,
inboundLimiter, outboundLimiter, readWriteIsolate);
}
}
Expand Down
57 changes: 21 additions & 36 deletions s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -84,35 +85,25 @@ private static String range(long start, long end) {
}

private static S3AsyncClient newS3AsyncClient(String endpoint, String region, boolean forcePathStyle,
String accessKey, String secretKey) {
List<AwsCredentialsProvider> credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
builder.credentialsProvider(AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build());
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}

private static String hideSecret(String secret) {
if (secret == null) {
return null;
}
if (secret.length() < 6) {
return "*".repeat(secret.length());
}
return secret.substring(0, 3) + "*".repeat(secret.length() - 6) + secret.substring(secret.length() - 3);
}

private static abstract class S3CheckTask implements AutoCloseable {
protected final S3AsyncClient client;
protected final String bucketName;
private final String taskName;

public S3CheckTask(S3Context context, String taskName) {
this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.accessKey, context.secretKey);
this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.credentialsProviders);
this.bucketName = context.bucketName;
this.taskName = taskName;
}
Expand Down Expand Up @@ -363,17 +354,16 @@ public void close() {

public static class S3Context {
private final String endpoint;
private final String accessKey;
private final String secretKey;
private final List<AwsCredentialsProvider> credentialsProviders;
private final String bucketName;
private final String region;
private final boolean forcePathStyle;

public S3Context(String endpoint, String accessKey, String secretKey, String bucketName, String region,
public S3Context(String endpoint, List<AwsCredentialsProvider> credentialsProviders, String bucketName,
String region,
boolean forcePathStyle) {
this.endpoint = endpoint;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.credentialsProviders = credentialsProviders;
this.bucketName = bucketName;
this.region = region;
this.forcePathStyle = forcePathStyle;
Expand Down Expand Up @@ -406,11 +396,13 @@ public List<String> advices() {
}
}
}
if (StringUtils.isBlank(accessKey)) {
advises.add("accessKey is blank. Please supply a valid accessKey.");
if (credentialsProviders == null || credentialsProviders.isEmpty()) {
advises.add("no credentials provider is supplied. Please supply a credentials provider.");
}
if (StringUtils.isBlank(secretKey)) {
advises.add("secretKey is blank. Please supply a valid secretKey.");
try (AwsCredentialsProviderChain chain = AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build()) {
chain.resolveCredentials();
} catch (SdkClientException e) {
advises.add("all provided credentials providers are invalid. Please supply a valid credentials provider. Error msg: " + e.getMessage());
}
if (StringUtils.isBlank(region)) {
advises.add("region is blank. Please supply a valid region.");
Expand All @@ -425,8 +417,7 @@ public List<String> advices() {
public String toString() {
return "S3CheckContext{" +
"endpoint='" + endpoint + '\'' +
", accessKey='" + hideSecret(accessKey) + '\'' +
", secretKey='" + hideSecret(secretKey) + '\'' +
", credentialsProviders=" + credentialsProviders +
", bucketName='" + bucketName + '\'' +
", region='" + region + '\'' +
", forcePathStyle=" + forcePathStyle +
Expand All @@ -435,8 +426,7 @@ public String toString() {

public static class Builder {
private String endpoint;
private String accessKey;
private String secretKey;
private List<AwsCredentialsProvider> credentialsProviders;
private String bucketName;
private String region;
private boolean forcePathStyle;
Expand All @@ -446,13 +436,8 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder setAccessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public Builder setSecretKey(String secretKey) {
this.secretKey = secretKey;
public Builder setCredentialsProviders(List<AwsCredentialsProvider> credentialsProviders) {
this.credentialsProviders = credentialsProviders;
return this;
}

Expand All @@ -472,7 +457,7 @@ public Builder setForcePathStyle(boolean forcePathStyle) {
}

public S3Context build() {
return new S3Context(endpoint, accessKey, secretKey, bucketName, region, forcePathStyle);
return new S3Context(endpoint, credentialsProviders, bucketName, region, forcePathStyle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import java.util.List;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT;

Expand All @@ -64,7 +66,7 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S

// S3 object manager, such as trim expired messages, etc.
S3Operator operator = new DefaultS3Operator(s3StreamConfig.s3Endpoint(), s3StreamConfig.s3Region(), s3StreamConfig.s3Bucket(),
s3StreamConfig.s3ForcePathStyle(), s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey());
s3StreamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey())));
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);

TransactionService transactionService = new TransactionService(storeConfig, timerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
Expand Down Expand Up @@ -92,7 +93,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
}

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
Expand All @@ -102,7 +104,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store

// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
Expand Down Expand Up @@ -255,8 +258,6 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.bucket(streamConfig.s3Bucket());
config.forcePathStyle(streamConfig.s3ForcePathStyle());
config.walPath(streamConfig.s3WALPath());
config.accessKey(streamConfig.s3AccessKey());
config.secretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());

Expand Down

0 comments on commit 4192785

Please sign in to comment.