Skip to content

Commit

Permalink
[feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
Browse files Browse the repository at this point in the history
* [improve] [tiered-storage] Add pure S3 provider for the offloader
---

*Motivation*

There have some cloud storages are compatible with S3
APIs, such as aliyun-oss. Some other storages also use
the S3 APIs and want to offload the data into them, but
we only support the AWS or the Aliyun.
The PR #8985 provides
the Aliyun offload provider, but it has a force limitation of
the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
is not a limitation on other storage service which compatible
with S3 APIs.
This PR provides  a more general offload provider `S3` which uses
pure JClouds S3 metadata and allows people to override the
default JClouds properties through system properties.

*Modifications*

- Add the pure S3 offload provider

(cherry picked from commit 047cb0e)
  • Loading branch information
zymap authored and hangc0276 committed Jun 7, 2022
1 parent d384076 commit 72629be
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,34 @@ public void buildCredentials(TieredStorageConfiguration config) {
ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
ALIYUN_OSS_VALIDATION.validate(config);
S3_VALIDATION.validate(config);
}

@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
return S3_BLOB_STORE_BUILDER.getBlobStore(config);
}

@Override
public void buildCredentials(TieredStorageConfiguration config) {
ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
S3_CREDENTIAL_BUILDER.buildCredentials(config);
}
},

S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
return S3_BLOB_STORE_BUILDER.getBlobStore(config);
}

@Override
public void buildCredentials(TieredStorageConfiguration config) {
S3_CREDENTIAL_BUILDER.buildCredentials(config);
}

@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
S3_VALIDATION.validate(config);
}
},

Expand Down Expand Up @@ -374,12 +391,14 @@ public String getAWSSecretKey() {
}
};

static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
Properties overrides = config.getOverrides();
// For security reasons, OSS supports only virtual hosted style access.
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
// For security reasons, OSS supports only virtual hosted style access.
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
}
contextBuilder.overrides(overrides);
contextBuilder.endpoint(config.getServiceEndpoint());

Expand All @@ -396,7 +415,7 @@ public String getAWSSecretKey() {
}
};

static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
throw new IllegalArgumentException(
"ServiceEndpoint must specified for " + config.getDriver() + " offload");
Expand All @@ -414,14 +433,21 @@ public String getAWSSecretKey() {
}
};

static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
if (StringUtils.isEmpty(accountName)) {
throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
// For forward compatibility
if (StringUtils.isEmpty(accountName.trim())) {
accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
}
if (StringUtils.isEmpty(accountName.trim())) {
throw new IllegalArgumentException("Couldn't get the access key id.");
}
String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
if (StringUtils.isEmpty(accountKey.trim())) {
accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
}
String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
if (StringUtils.isEmpty(accountKey)) {
throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
if (StringUtils.isEmpty(accountKey.trim())) {
throw new IllegalArgumentException("Couldn't get the access key secret.");
}
Credentials credentials = new Credentials(
accountName, accountKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ protected Properties getOverrides() {
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
}

// load more jclouds properties into the overrides
System.getProperties().entrySet().stream()
.filter(p -> p.getKey().toString().startsWith("jclouds"))
.forEach(jcloudsProp -> {
overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
});

System.getenv().entrySet().stream()
.filter(p -> p.getKey().toString().startsWith("jclouds"))
.forEach(jcloudsProp -> {
overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
});

log.info("getOverrides: {}", overrides.toString());
return overrides;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.testng.annotations.Test;

public class JCloudBlobStoreProviderTests {
Expand Down Expand Up @@ -105,4 +103,33 @@ public void transientValidationFailureTest() {
config = new TieredStorageConfiguration(map);
JCloudBlobStoreProvider.TRANSIENT.validate(config);
}

@Test()
public void s3ValidationTest() {
Map<String, String> map = new HashMap<>();
map.put("managedLedgerOffloadDriver", "S3");
map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
map.put("managedLedgerOffloadBucket", "test-s3-bucket");
TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
configuration.getProvider().validate(configuration);
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
public void s3ValidationServiceEndpointMissed() {
Map<String, String> map = new HashMap<>();
map.put("managedLedgerOffloadDriver", "S3");
TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
configuration.getProvider().validate(configuration);
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
public void s3ValidationBucketMissed() {
Map<String, String> map = new HashMap<>();
map.put("managedLedgerOffloadDriver", "S3");
map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
configuration.getProvider().validate(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jclouds.domain.Credentials;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -205,4 +207,19 @@ public final void gcsBackwardCompatiblePropertiesTest() {
assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
}

@Test
public void overridePropertiesTest() {
Map<String, String> map = new HashMap<>();
map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
map.put("s3ManagedLedgerOffloadRegion", "my-region");
System.setProperty("jclouds.SystemPropertyA", "A");
System.setProperty("jclouds.region", "jclouds-region");
TieredStorageConfiguration config = new TieredStorageConfiguration(map);
Properties properties = config.getOverrides();
System.out.println(properties.toString());
assertEquals(properties.get("jclouds.region"), "jclouds-region");
assertEquals(config.getServiceEndpoint(), "http://localhost");
assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
}
}

0 comments on commit 72629be

Please sign in to comment.