Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tiered-storage] Add support for AWS instance and role creds #4433

Merged
merged 3 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions site2/docs/cookbooks-tiered-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ getting charged for incomplete uploads.

## Configuring the offload driver

Offloading is configured in ```broker.conf```.
Offloading is configured in ```broker.conf```.

At a minimum, the administrator must configure the driver, the bucket and the authenticating credentials.
There is also some other knobs to configure, like the bucket region, the max block size in backed storage, etc.
Expand Down Expand Up @@ -82,7 +82,12 @@ but relies on the mechanisms supported by the

Once you have created a set of credentials in the AWS IAM console, they can be configured in a number of ways.

1. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```.
1. Using ec2 instance metadata credentials

If you are on AWS instance with an instance profile that provides credentials, Pulsar will use these credentials
if no other mechanism is provided

2. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```.

```bash
export AWS_ACCESS_KEY_ID=ABC123456789
Expand All @@ -92,21 +97,30 @@ export AWS_SECRET_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
> \"export\" is important so that the variables are made available in the environment of spawned processes.


2. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`.
3. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`.

```bash
PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} ${PULSAR_MEM} ${PULSAR_GC} -Daws.accessKeyId=ABC123456789 -Daws.secretKey=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"
```

3. Set the access credentials in ```~/.aws/credentials```.
4. Set the access credentials in ```~/.aws/credentials```.

```conf
[default]
aws_access_key_id=ABC123456789
aws_secret_access_key=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
```

If you are running in EC2 you can also use instance profile credentials, provided through the EC2 metadata service, but that is out of scope for this cookbook.
5. Assuming an IAM role

If you want to assume an IAM role, this can be done via specifying the following:

```conf
s3ManagedLedgerOffloadRole=<aws role arn>
s3ManagedLedgerOffloadRoleSessionName=pulsar-s3-offload
```

This will use the `DefaultAWSCredentialsProviderChain` for assuming this role.

> The broker must be rebooted for credentials specified in pulsar_env to take effect.

Expand Down Expand Up @@ -134,7 +148,7 @@ gcsManagedLedgerOffloadBucket=pulsar-topic-offload
Bucket Region is the region where bucket located. Bucket Region is not a required but
a recommended configuration. If it is not configured, It will use the default region.

Regarding GCS, buckets are default created in the `us multi-regional location`,
Regarding GCS, buckets are default created in the `us multi-regional location`,
page [Bucket Locations](https://cloud.google.com/storage/docs/bucket-locations) contains more information.

```conf
Expand Down Expand Up @@ -211,7 +225,7 @@ Offload was a success
If there is an error offloading, the error will be propagated to the offload-status command.

```bash
$ bin/pulsar-admin topics offload-status persistent://public/default/topic1
$ bin/pulsar-admin topics offload-status persistent://public/default/topic1
Error in offload
null

Expand Down
12 changes: 7 additions & 5 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
|bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies ||
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
|bookkeeperClientAuthenticationParameters|||
|bookkeeperClientTimeoutInSeconds| Timeout for BK add / read operations |30|
|bookkeeperClientSpeculativeReadTimeoutInMillis| Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0|
|bookkeeperClientHealthCheckEnabled| Enable bookies health check. Bookies that have more than the configured number of failure within the interval will be quarantined for some time. During this period, new ledgers won’t be created on these bookies |true|
Expand Down Expand Up @@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) ||
|s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) |67108864|
|s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) |1048576|
|s3ManagedLedgerOffloadRole| For Amazon S3 ledger offload, provide a role to assume before writing to s3 ||
|s3ManagedLedgerOffloadRoleSessionName| For Amazon S3 ledger offload, provide a role session name when using a role |pulsar-s3-offload|



Expand All @@ -240,7 +242,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|authPlugin| The authentication plugin. ||
|authParams| The authentication parameters for the cluster, as a comma-separated string. ||
|useTls| Whether or not TLS authentication will be enforced in the cluster. |false|
|tlsAllowInsecureConnection|||
|tlsAllowInsecureConnection|||
|tlsTrustCertsFilePath|||


Expand Down Expand Up @@ -335,7 +337,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|authenticationEnabled| Enable authentication for the broker. |false|
|authenticationProviders| A comma-separated list of class names for authentication providers. |false|
|authorizationEnabled| Enforce authorization in brokers. |false|
|superUserRoles| Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. ||
|superUserRoles| Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. ||
|brokerClientAuthenticationPlugin| The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. ||
|brokerClientAuthenticationParameters| The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin. ||
|athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list. ||
Expand All @@ -351,7 +353,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|bookkeeperClientRackawarePolicyEnabled| |true|
|bookkeeperClientRegionawarePolicyEnabled| |false|
|bookkeeperClientReorderReadSequenceEnabled| |false|
|bookkeeperClientIsolationGroups|||
|bookkeeperClientIsolationGroups|||
|managedLedgerDefaultEnsembleSize| |1|
|managedLedgerDefaultWriteQuorum| |1|
|managedLedgerDefaultAckQuorum| |1|
Expand Down Expand Up @@ -409,7 +411,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|bindAddress||0.0.0.0|
|clusterName |||
|authenticationEnabled||false|
|authenticationProviders|||
|authenticationProviders|||
|authorizationEnabled||false|
|superUserRoles |||
|brokerClientAuthenticationPlugin|||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName)
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
.withEnv("AWS_SECRET_KEY", "secretkey")
)
);

Expand Down
4 changes: 4 additions & 0 deletions tiered-storage/jcloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import static org.apache.pulsar.common.util.FieldParser.value;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.google.common.base.Strings;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
Expand Down Expand Up @@ -54,6 +58,12 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable {
// For Amazon S3 ledger offload, Read buffer size in bytes.
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Amazon S3 ledger offload, provide a role to assume before writing to s3
private String s3ManagedLedgerOffloadRole = null;

// For Amazon S3 ledger offload, provide a role session name when using a role
private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload";

// For Google Cloud Storage ledger offload, region where offload bucket is located.
// reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
private String gcsManagedLedgerOffloadRegion = null;
Expand All @@ -71,6 +81,20 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable {
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

/**
* Builds an AWS credential provider based on the offload options
* @return aws credential provider
*/
public AWSCredentialsProvider getAWSCredentialProvider() {
if (Strings.isNullOrEmpty(this.getS3ManagedLedgerOffloadRole())) {
return DefaultAWSCredentialsProviderChain.getInstance();
} else {
String roleName = this.getS3ManagedLedgerOffloadRole();
String roleSessionName = this.getS3ManagedLedgerOffloadRoleSessionName();
return new STSAssumeRoleSessionCredentialsProvider.Builder(roleName, roleSessionName).build();
}
}

/**
* Create a tiered storage configuration from the provided <tt>properties</tt>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -47,8 +48,10 @@
import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.aws.domain.SessionCredentials;
import org.jclouds.aws.s3.AWSS3ProviderMetadata;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
Expand Down Expand Up @@ -113,7 +116,7 @@ private static class BlobStoreLocation {
private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver,
String region,
String endpoint,
Credentials credentials,
Supplier<Credentials> credentials,
int maxBlockSize) {
Properties overrides = new Properties();
// This property controls the number of parts being uploaded in parallel.
Expand All @@ -127,7 +130,7 @@ private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver,
ProviderRegistry.registerProvider(new GoogleCloudStorageProviderMetadata());

ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
contextBuilder.credentials(credentials.identity, credentials.credential);
contextBuilder.credentialsSupplier(credentials);

if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
contextBuilder.endpoint(endpoint);
Expand Down Expand Up @@ -162,7 +165,7 @@ private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver,
// the endpoint
private final String writeEndpoint;
// credentials
private final Credentials credentials;
private final Supplier<Credentials> credentials;

// max block size for each data block.
private int maxBlockSize;
Expand Down Expand Up @@ -223,13 +226,13 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationD
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload");
}

Credentials credentials = getCredentials(driver, conf);
Supplier<Credentials> credentials = getCredentials(driver, conf);

return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
maxBlockSize, readBufferSize, endpoint, region, credentials, userMetadata);
}

public static Credentials getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException {
public static Supplier<Credentials> getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException {
// credentials:
// for s3, get by DefaultAWSCredentialsProviderChain.
// for gcs, use downloaded file 'google_creds.json', which contains service account key by
Expand All @@ -243,28 +246,42 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati
}
try {
String gcsKeyContent = Files.toString(new File(gcsKeyPath), Charset.defaultCharset());
return new GoogleCredentialsFromJson(gcsKeyContent).get();
return () -> new GoogleCredentialsFromJson(gcsKeyContent).get();
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file: {}", gcsKeyPath);
throw new IOException(ioe);
}
} else if (isS3Driver(driver)) {
AWSCredentials credentials = null;
AWSCredentialsProvider credsChain = conf.getAWSCredentialProvider();
// try and get creds before starting... if we can't fetch
// creds on boot, we want to fail
try {
DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
credentials = creds.getCredentials();
credsChain.getCredentials();
} catch (Exception e) {
// allowed, some mock s3 service not need credential
log.warn("Exception when get credentials for s3 ", e);
log.error("unable to fetch S3 credentials for offloading, failing", e);
throw e;
}

String id = "accesskey";
String key = "secretkey";
if (credentials != null) {
id = credentials.getAWSAccessKeyId();
key = credentials.getAWSSecretKey();
}
return new Credentials(id, key);
return () -> {
AWSCredentials creds = credsChain.getCredentials();
if (creds == null) {
// we don't expect this to happen, as we
// successfully fetched creds on boot
throw new RuntimeException("Unable to fetch S3 credentials after start, unexpected!");
}
// if we have session credentials, we need to send the session token
// this allows us to support EC2 metadata credentials
if (creds instanceof AWSSessionCredentials) {
return SessionCredentials.builder()
.accessKeyId(creds.getAWSAccessKeyId())
.secretAccessKey(creds.getAWSSecretKey())
.sessionToken(((AWSSessionCredentials) creds).getSessionToken())
.build();
} else {
return new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey());
}
};
} else {
throw new IOException(
"Not support this kind of driver: " + driver);
Expand All @@ -274,13 +291,13 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati

// build context for jclouds BlobStoreContext
BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials) {
int maxBlockSize, int readBufferSize, String endpoint, String region, Supplier<Credentials> credentials) {
this(driver, container, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials, Maps.newHashMap());
}

BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize,
String endpoint, String region, Credentials credentials,
String endpoint, String region, Supplier<Credentials> credentials,
Map<String, String> userMetadata) {
this.offloadDriverName = driver;
this.scheduler = scheduler;
Expand Down
Loading