From d2c5c51efa928f80e7f5a32ef7e76abe4c6caa6f Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Fri, 31 May 2019 15:59:42 -0600 Subject: [PATCH 1/3] Add support for AWS instance and role creds This commit makes changes to the tiered storage support for S3 to allow for support of ec2 metadata instance credentials as well as additional config options for assuming a role to get credentials. This works by changing the way we provide credentials to use the funtional `Supplier` interface and for using the AWS specific `SessionCredentials` object for when we detect that the `CredentialProvider` is providing credentials that have a session token. --- site2/docs/cookbooks-tiered-storage.md | 28 ++++++--- site2/docs/reference-configuration.md | 12 ++-- tiered-storage/jcloud/pom.xml | 4 ++ .../TieredStorageConfigurationData.java | 24 +++++++ .../impl/BlobStoreManagedLedgerOffloader.java | 63 ++++++++++++------- .../BlobStoreManagedLedgerOffloaderTest.java | 45 +++++++++++++ 6 files changed, 140 insertions(+), 36 deletions(-) diff --git a/site2/docs/cookbooks-tiered-storage.md b/site2/docs/cookbooks-tiered-storage.md index 5d75679a02026..7f8bc58154823 100644 --- a/site2/docs/cookbooks-tiered-storage.md +++ b/site2/docs/cookbooks-tiered-storage.md @@ -32,7 +32,7 @@ getting charged for incomplete uploads. ## Configuring the offload driver -Offloading is configured in ```broker.conf```. +Offloading is configured in ```broker.conf```. At a minimum, the administrator must configure the driver, the bucket and the authenticating credentials. There is also some other knobs to configure, like the bucket region, the max block size in backed storage, etc. @@ -82,7 +82,12 @@ but relies on the mechanisms supported by the Once you have created a set of credentials in the AWS IAM console, they can be configured in a number of ways. -1. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```. +1. Using ec2 instance metadata credentials + +If you are on AWS instance with an instance profile that provides credentials, Pulsar will use these credentials +if no other mechanism is provided + +2. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```. ```bash export AWS_ACCESS_KEY_ID=ABC123456789 @@ -92,13 +97,13 @@ export AWS_SECRET_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c > \"export\" is important so that the variables are made available in the environment of spawned processes. -2. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`. +3. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`. ```bash PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} ${PULSAR_MEM} ${PULSAR_GC} -Daws.accessKeyId=ABC123456789 -Daws.secretKey=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024" ``` -3. Set the access credentials in ```~/.aws/credentials```. +4. Set the access credentials in ```~/.aws/credentials```. ```conf [default] @@ -106,7 +111,16 @@ aws_access_key_id=ABC123456789 aws_secret_access_key=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c ``` -If you are running in EC2 you can also use instance profile credentials, provided through the EC2 metadata service, but that is out of scope for this cookbook. +5. Assuming an IAM role + +If you want to assume an IAM role, this can be done via specifying the following: + +```conf +s3ManagedLedgerOffloadRole= +s3ManagedLedgerOffloadRoleSessionName=pulsar-s3-offload +``` + +This will use the `DefaultAWSCredentialsProviderChain` for assuming this role. > The broker must be rebooted for credentials specified in pulsar_env to take effect. @@ -134,7 +148,7 @@ gcsManagedLedgerOffloadBucket=pulsar-topic-offload Bucket Region is the region where bucket located. Bucket Region is not a required but a recommended configuration. If it is not configured, It will use the default region. -Regarding GCS, buckets are default created in the `us multi-regional location`, +Regarding GCS, buckets are default created in the `us multi-regional location`, page [Bucket Locations](https://cloud.google.com/storage/docs/bucket-locations) contains more information. ```conf @@ -211,7 +225,7 @@ Offload was a success If there is an error offloading, the error will be propagated to the offload-status command. ```bash -$ bin/pulsar-admin topics offload-status persistent://public/default/topic1 +$ bin/pulsar-admin topics offload-status persistent://public/default/topic1 Error in offload null diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 4f040f2af768e..4a859951549ba 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -159,7 +159,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication || |bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies || |bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values || -|bookkeeperClientAuthenticationParameters||| +|bookkeeperClientAuthenticationParameters||| |bookkeeperClientTimeoutInSeconds| Timeout for BK add / read operations |30| |bookkeeperClientSpeculativeReadTimeoutInMillis| Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0| |bookkeeperClientHealthCheckEnabled| Enable bookies health check. Bookies that have more than the configured number of failure within the interval will be quarantined for some time. During this period, new ledgers won’t be created on these bookies |true| @@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) || |s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) |67108864| |s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) |1048576| +|s3ManagedLedgerOffloadRole| For Amazon S3 ledger offload, provide a role to assume before writing to s3 || +|s3ManagedLedgerOffloadRoleSessionName| For Amazon S3 ledger offload, provide a role session name when using a role |pulsar-s3-offload| @@ -240,7 +242,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |authPlugin| The authentication plugin. || |authParams| The authentication parameters for the cluster, as a comma-separated string. || |useTls| Whether or not TLS authentication will be enforced in the cluster. |false| -|tlsAllowInsecureConnection||| +|tlsAllowInsecureConnection||| |tlsTrustCertsFilePath||| @@ -335,7 +337,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |authenticationEnabled| Enable authentication for the broker. |false| |authenticationProviders| A comma-separated list of class names for authentication providers. |false| |authorizationEnabled| Enforce authorization in brokers. |false| -|superUserRoles| Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. || +|superUserRoles| Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. || |brokerClientAuthenticationPlugin| The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. || |brokerClientAuthenticationParameters| The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin. || |athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list. || @@ -351,7 +353,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |bookkeeperClientRackawarePolicyEnabled| |true| |bookkeeperClientRegionawarePolicyEnabled| |false| |bookkeeperClientReorderReadSequenceEnabled| |false| -|bookkeeperClientIsolationGroups||| +|bookkeeperClientIsolationGroups||| |managedLedgerDefaultEnsembleSize| |1| |managedLedgerDefaultWriteQuorum| |1| |managedLedgerDefaultAckQuorum| |1| @@ -409,7 +411,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |bindAddress||0.0.0.0| |clusterName ||| |authenticationEnabled||false| -|authenticationProviders||| +|authenticationProviders||| |authorizationEnabled||false| |superUserRoles ||| |brokerClientAuthenticationPlugin||| diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml index 57fc36cddfa24..d635337041daf 100644 --- a/tiered-storage/jcloud/pom.xml +++ b/tiered-storage/jcloud/pom.xml @@ -73,6 +73,10 @@ com.amazonaws aws-java-sdk-core + + com.amazonaws + aws-java-sdk-sts + com.jamesmurty.utils java-xmlbuilder diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java index 52fedfd4344dd..a4c5cf4fa8b2e 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java @@ -20,6 +20,10 @@ import static org.apache.pulsar.common.util.FieldParser.value; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.google.common.base.Strings; import java.io.Serializable; import java.lang.reflect.Field; import java.util.Arrays; @@ -54,6 +58,12 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable { // For Amazon S3 ledger offload, Read buffer size in bytes. private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB + // For Amazon S3 ledger offload, provide a role to assume before writing to s3 + private String s3ManagedLedgerOffloadRole = null; + + // For Amazon S3 ledger offload, provide a role session name when using a role + private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload"; + // For Google Cloud Storage ledger offload, region where offload bucket is located. // reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations private String gcsManagedLedgerOffloadRegion = null; @@ -71,6 +81,20 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable { // For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849 private String gcsManagedLedgerOffloadServiceAccountKeyFile = null; + /** + * Builds an AWS credential provider based on the offload options + * @return aws credential provider + */ + public AWSCredentialsProvider getAWSCredentialProvider() { + if (Strings.isNullOrEmpty(this.getS3ManagedLedgerOffloadRole())) { + return DefaultAWSCredentialsProviderChain.getInstance(); + } else { + String roleName = this.getS3ManagedLedgerOffloadRole(); + String roleSessionName = this.getS3ManagedLedgerOffloadRoleSessionName(); + return new STSAssumeRoleSessionCredentialsProvider.Builder(roleName, roleSessionName).build(); + } + } + /** * Create a tiered storage configuration from the provided properties. * diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 96409b29d791a..f4bc502b245c3 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -19,7 +19,8 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSSessionCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -47,8 +48,10 @@ import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.jclouds.Constants; import org.jclouds.ContextBuilder; +import org.jclouds.aws.domain.SessionCredentials; import org.jclouds.aws.s3.AWSS3ProviderMetadata; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; @@ -113,7 +116,7 @@ private static class BlobStoreLocation { private static Pair createBlobStore(String driver, String region, String endpoint, - Credentials credentials, + Supplier credentials, int maxBlockSize) { Properties overrides = new Properties(); // This property controls the number of parts being uploaded in parallel. @@ -127,7 +130,7 @@ private static Pair createBlobStore(String driver, ProviderRegistry.registerProvider(new GoogleCloudStorageProviderMetadata()); ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); - contextBuilder.credentials(credentials.identity, credentials.credential); + contextBuilder.credentialsSupplier(credentials); if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { contextBuilder.endpoint(endpoint); @@ -162,7 +165,7 @@ private static Pair createBlobStore(String driver, // the endpoint private final String writeEndpoint; // credentials - private final Credentials credentials; + private final Supplier credentials; // max block size for each data block. private int maxBlockSize; @@ -223,13 +226,13 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationD "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload"); } - Credentials credentials = getCredentials(driver, conf); + Supplier credentials = getCredentials(driver, conf); return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials, userMetadata); } - public static Credentials getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException { + public static Supplier getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException { // credentials: // for s3, get by DefaultAWSCredentialsProviderChain. // for gcs, use downloaded file 'google_creds.json', which contains service account key by @@ -243,28 +246,40 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati } try { String gcsKeyContent = Files.toString(new File(gcsKeyPath), Charset.defaultCharset()); - return new GoogleCredentialsFromJson(gcsKeyContent).get(); + return () -> new GoogleCredentialsFromJson(gcsKeyContent).get(); } catch (IOException ioe) { log.error("Cannot read GCS service account credentials file: {}", gcsKeyPath); throw new IOException(ioe); } } else if (isS3Driver(driver)) { - AWSCredentials credentials = null; - try { - DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); - credentials = creds.getCredentials(); - } catch (Exception e) { - // allowed, some mock s3 service not need credential - log.warn("Exception when get credentials for s3 ", e); - } + AWSCredentialsProvider credsChain = conf.getAWSCredentialProvider(); - String id = "accesskey"; - String key = "secretkey"; - if (credentials != null) { - id = credentials.getAWSAccessKeyId(); - key = credentials.getAWSSecretKey(); - } - return new Credentials(id, key); + return () -> { + AWSCredentials creds = null; + try { + creds = credsChain.getCredentials(); + } catch (Exception e) { + // allowed, some mock s3 service not need credential + log.warn("Exception when get credentials for s3 ", e); + } + Credentials jcloudCred = null; + if (creds != null) { + // if we have session credentials, we need to send the session token + // this allows us to support EC2 metadata credentials + if (creds instanceof AWSSessionCredentials) { + jcloudCred = SessionCredentials.builder() + .accessKeyId(creds.getAWSAccessKeyId()) + .secretAccessKey(creds.getAWSSecretKey()) + .sessionToken(((AWSSessionCredentials) creds).getSessionToken()) + .build(); + } else { + jcloudCred = new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey()); + } + } else { + jcloudCred = new Credentials("accesskey", "secretkey"); + } + return jcloudCred; + }; } else { throw new IOException( "Not support this kind of driver: " + driver); @@ -274,13 +289,13 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati // build context for jclouds BlobStoreContext BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler, - int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials) { + int maxBlockSize, int readBufferSize, String endpoint, String region, Supplier credentials) { this(driver, container, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials, Maps.newHashMap()); } BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler, int maxBlockSize, int readBufferSize, - String endpoint, String region, Credentials credentials, + String endpoint, String region, Supplier credentials, Map userMetadata) { this.offloadDriverName = driver; this.scheduler = scheduler; diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index 7f80397613ec2..a1d50af9dfb65 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -25,6 +25,9 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSSessionCredentials; import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.IOException; @@ -52,11 +55,14 @@ import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase; import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.data.ACL; +import org.jclouds.aws.domain.SessionCredentials; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.options.CopyOptions; +import org.jclouds.domain.Credentials; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -605,5 +611,44 @@ public void testReadUnknownIndexVersion() throws Exception { Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); } } + + @Test + public void testSessionCredentialSupplier() throws Exception { + TieredStorageConfigurationData mock = mock(TieredStorageConfigurationData.class); + Mockito.when(mock.getAWSCredentialProvider()).thenReturn(new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new AWSSessionCredentials() { + @Override + public String getSessionToken() { + return "token"; + } + + @Override + public String getAWSAccessKeyId() { + return "access"; + } + + @Override + public String getAWSSecretKey() { + return "secret"; + } + }; + } + + @Override + public void refresh() { + + } + }); + + Supplier creds = BlobStoreManagedLedgerOffloader.getCredentials("aws-s3", mock); + + Assert.assertTrue(creds.get() instanceof SessionCredentials); + SessionCredentials sessCreds = (SessionCredentials) creds.get(); + Assert.assertEquals(sessCreds.getAccessKeyId(), "access"); + Assert.assertEquals(sessCreds.getSecretAccessKey(), "secret"); + Assert.assertEquals(sessCreds.getSessionToken(), "token"); + } } From cd11012d4300469d8f99ca54a19ecb9289fce98a Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sat, 1 Jun 2019 22:04:06 -0600 Subject: [PATCH 2/3] [tiered_storage] Tweak s3 credential handling to check on boot This changes the s3 handling slightly, instead of falling back to static credentials, we instead now fail if no s3 credentials can be found and change the unit tests to start a broker with s3 credentials. With the new Supplier API, we now fetch credentials on every request. Because of this, the failure and subsequent try/catch is costly and the integration tests were using this, which caused them to be significantly slower. Instead, we just check to see if we can fetch creds, and if we can't consider it an error condition to exit the app as it is unlikely in a production scenario to not have some credentials. --- .../integration/topologies/PulsarCluster.java | 3 ++ .../impl/BlobStoreManagedLedgerOffloader.java | 44 ++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 4a12346c767ea..d824880c1858a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -157,6 +157,9 @@ private PulsarCluster(PulsarClusterSpec spec) { .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) .withEnv("clusterName", clusterName) .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") + // used in s3 tests + .withEnv("AWS_ACCESS_KEY_ID", "accesskey") + .withEnv("AWS_SECRET_KEY", "secretkey") ) ); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index f4bc502b245c3..38fe8809c5882 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -253,32 +253,34 @@ public static Supplier getCredentials(String driver, TieredStorageC } } else if (isS3Driver(driver)) { AWSCredentialsProvider credsChain = conf.getAWSCredentialProvider(); + // try and get creds before starting... if we can't fetch + // creds on boot, we want to fail + try { + credsChain.getCredentials(); + } catch (Exception e) { + // allowed, some mock s3 service not need credential + log.error("unable to fetch S3 credentials for offloading, failing", e); + throw e; + } return () -> { - AWSCredentials creds = null; - try { - creds = credsChain.getCredentials(); - } catch (Exception e) { - // allowed, some mock s3 service not need credential - log.warn("Exception when get credentials for s3 ", e); + AWSCredentials creds = credsChain.getCredentials(); + if (creds == null) { + // we don't expect this to happen, as we + // successfully fetched creds on boot + throw new RuntimeException("Unable to fetch S3 credentials after start, unexpected!"); } - Credentials jcloudCred = null; - if (creds != null) { - // if we have session credentials, we need to send the session token - // this allows us to support EC2 metadata credentials - if (creds instanceof AWSSessionCredentials) { - jcloudCred = SessionCredentials.builder() - .accessKeyId(creds.getAWSAccessKeyId()) - .secretAccessKey(creds.getAWSSecretKey()) - .sessionToken(((AWSSessionCredentials) creds).getSessionToken()) - .build(); - } else { - jcloudCred = new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey()); - } + // if we have session credentials, we need to send the session token + // this allows us to support EC2 metadata credentials + if (creds instanceof AWSSessionCredentials) { + return SessionCredentials.builder() + .accessKeyId(creds.getAWSAccessKeyId()) + .secretAccessKey(creds.getAWSSecretKey()) + .sessionToken(((AWSSessionCredentials) creds).getSessionToken()) + .build(); } else { - jcloudCred = new Credentials("accesskey", "secretkey"); + return new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey()); } - return jcloudCred; }; } else { throw new IOException( From 08c5f0094ae5a4428491750887e74c912385c9da Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sat, 1 Jun 2019 23:09:03 -0600 Subject: [PATCH 3/3] fix s3 test for missing creds --- .../BlobStoreManagedLedgerOffloaderTest.java | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index a1d50af9dfb65..07308ff848946 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -247,7 +247,37 @@ public void testGcsSmallBlockSizeConfigured() throws Exception { @Test public void testS3DriverConfiguredWell() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + TieredStorageConfigurationData conf = new TieredStorageConfigurationData() { + @Override + public AWSCredentialsProvider getAWSCredentialProvider() { + return new AWSCredentialsProvider() { + @Override + public AWSCredentials getCredentials() { + return new AWSSessionCredentials() { + @Override + public String getSessionToken() { + return "token"; + } + + @Override + public String getAWSAccessKeyId() { + return "access"; + } + + @Override + public String getAWSSecretKey() { + return "secret"; + } + }; + } + + @Override + public void refresh() { + + } + }; + } + }; conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); conf.setS3ManagedLedgerOffloadServiceEndpoint("http://fake.s3.end.point");