Skip to content

Commit

Permalink
Add S3 async upload utilities and models
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
  • Loading branch information
raghuvanshraj committed May 25, 2023
1 parent 9ceae51 commit 6c12571
Show file tree
Hide file tree
Showing 12 changed files with 1,415 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import org.opensearch.common.concurrent.RefCountedReleasable;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.io.Closeable;
import java.io.IOException;

/**
* Handles the shutdown of the wrapped {@link software.amazon.awssdk.services.s3.S3AsyncClient} using reference
* counting.
*/
public class AmazonAsyncS3Reference extends RefCountedReleasable<AmazonAsyncS3WithCredentials> {

AmazonAsyncS3Reference(AmazonAsyncS3WithCredentials client) {
super("AWS_S3_CLIENT", client, () -> {
client.client().close();
client.priorityClient().close();
AwsCredentialsProvider credentials = client.credentials();
if (credentials instanceof Closeable) {
try {
((Closeable) credentials).close();
} catch (IOException e) {
/* Do nothing here */
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import org.opensearch.common.Nullable;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

/**
* The holder of the AmazonS3 and AWSCredentialsProvider
*/
final class AmazonAsyncS3WithCredentials {
private final S3AsyncClient client;
private final S3AsyncClient priorityClient;
private final AwsCredentialsProvider credentials;

private AmazonAsyncS3WithCredentials(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
@Nullable final AwsCredentialsProvider credentials
) {
this.client = client;
this.credentials = credentials;
this.priorityClient = priorityClient;
}

S3AsyncClient client() {
return client;
}

S3AsyncClient priorityClient() {
return priorityClient;
}

AwsCredentialsProvider credentials() {
return credentials;
}

static AmazonAsyncS3WithCredentials create(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
@Nullable final AwsCredentialsProvider credentials
) {
return new AmazonAsyncS3WithCredentials(client, priorityClient, credentials);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,48 @@ final class S3ClientSettings {
key -> Setting.timeSetting(key, TimeValue.timeValueMillis(ClientConfiguration.DEFAULT_SOCKET_TIMEOUT), Property.NodeScope)
);

/** The request timeout for connecting to s3. */
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"request_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
);

/** The connection timeout for connecting to s3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"connection_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope)
);

/** The connection TTL for connecting to s3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_TTL_SETTING = Setting.affixKeySetting(
PREFIX,
"connection_ttl",
key -> Setting.timeSetting(key, TimeValue.timeValueMillis(5000), Property.NodeScope)
);

/** The maximum connections to s3. */
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
PREFIX,
"max_connections",
key -> Setting.intSetting(key, 100, Property.NodeScope)
);

/** Connection acquisition timeout for new connections to S3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
PREFIX,
"connection_acquisition_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
);

/** The maximum pending connections to S3. */
static final Setting.AffixSetting<Integer> MAX_PENDING_CONNECTION_ACQUIRES = Setting.affixKeySetting(
PREFIX,
"max_pending_connection_acquires",
key -> Setting.intSetting(key, 10_000, Property.NodeScope)
);

/** The number of retries to use when an s3 request fails. */
static final Setting.AffixSetting<Integer> MAX_RETRIES_SETTING = Setting.affixKeySetting(
PREFIX,
Expand Down Expand Up @@ -230,6 +272,24 @@ final class S3ClientSettings {
/** The read timeout for the s3 client. */
final int readTimeoutMillis;

/** The request timeout for the s3 client */
final int requestTimeoutMillis;

/** The connection timeout for the s3 client */
final int connectionTimeoutMillis;

/** The connection TTL for the s3 client */
final int connectionTTLMillis;

/** The max number of connections for the s3 client */
final int maxConnections;

/** The connnection acquisition timeout for the s3 async client */
final int connectionAcquisitionTimeoutMillis;

/** The max number of requests pending to acquire connection for the s3 async client */
final int maxPendingConnectionAcquires;

/** The number of retries to use for the s3 client. */
final int maxRetries;

Expand All @@ -254,6 +314,12 @@ private S3ClientSettings(
String endpoint,
Protocol protocol,
int readTimeoutMillis,
int requestTimeoutMillis,
int connectionTimeoutMillis,
int connectionTTLMillis,
int maxConnections,
int connectionAcquisitionTimeoutMillis,
int maxPendingConnectionAcquires,
int maxRetries,
boolean throttleRetries,
boolean pathStyleAccess,
Expand All @@ -267,6 +333,12 @@ private S3ClientSettings(
this.endpoint = endpoint;
this.protocol = protocol;
this.readTimeoutMillis = readTimeoutMillis;
this.requestTimeoutMillis = requestTimeoutMillis;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.connectionTTLMillis = connectionTTLMillis;
this.maxConnections = maxConnections;
this.connectionAcquisitionTimeoutMillis = connectionAcquisitionTimeoutMillis;
this.maxPendingConnectionAcquires = maxPendingConnectionAcquires;
this.maxRetries = maxRetries;
this.throttleRetries = throttleRetries;
this.pathStyleAccess = pathStyleAccess;
Expand Down Expand Up @@ -298,6 +370,27 @@ S3ClientSettings refine(Settings repositorySettings) {
final int newReadTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(READ_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(readTimeoutMillis)).millis()
);
final int newRequestTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(REQUEST_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(requestTimeoutMillis)).millis()
);
final int newConnectionTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(CONNECTION_TIMEOUT_SETTING, normalizedSettings, TimeValue.timeValueMillis(connectionTimeoutMillis))
.millis()
);
final int newConnectionTTLMillis = Math.toIntExact(
getRepoSettingOrDefault(CONNECTION_TTL_SETTING, normalizedSettings, TimeValue.timeValueMillis(connectionTTLMillis)).millis()
);
final int newConnectionAcquisitionTimeoutMillis = Math.toIntExact(
getRepoSettingOrDefault(
CONNECTION_ACQUISITION_TIMEOUT,
normalizedSettings,
TimeValue.timeValueMillis(connectionAcquisitionTimeoutMillis)
).millis()
);
final int newMaxConnections = Math.toIntExact(getRepoSettingOrDefault(MAX_CONNECTIONS_SETTING, normalizedSettings, maxConnections));
final int newMaxPendingConnectionAcquires = Math.toIntExact(
getRepoSettingOrDefault(MAX_PENDING_CONNECTION_ACQUIRES, normalizedSettings, maxPendingConnectionAcquires)
);
final int newMaxRetries = getRepoSettingOrDefault(MAX_RETRIES_SETTING, normalizedSettings, maxRetries);
final boolean newThrottleRetries = getRepoSettingOrDefault(USE_THROTTLE_RETRIES_SETTING, normalizedSettings, throttleRetries);
final boolean newPathStyleAccess = getRepoSettingOrDefault(USE_PATH_STYLE_ACCESS, normalizedSettings, pathStyleAccess);
Expand All @@ -319,6 +412,12 @@ S3ClientSettings refine(Settings repositorySettings) {
&& Objects.equals(proxySettings.getHostName(), newProxyHost)
&& proxySettings.getPort() == newProxyPort
&& newReadTimeoutMillis == readTimeoutMillis
&& newRequestTimeoutMillis == requestTimeoutMillis
&& newConnectionTimeoutMillis == connectionTimeoutMillis
&& newConnectionTTLMillis == connectionTTLMillis
&& newMaxConnections == maxConnections
&& newConnectionAcquisitionTimeoutMillis == connectionAcquisitionTimeoutMillis
&& newMaxPendingConnectionAcquires == maxPendingConnectionAcquires
&& maxRetries == newMaxRetries
&& newThrottleRetries == throttleRetries
&& Objects.equals(credentials, newCredentials)
Expand All @@ -336,6 +435,12 @@ S3ClientSettings refine(Settings repositorySettings) {
newEndpoint,
newProtocol,
newReadTimeoutMillis,
newRequestTimeoutMillis,
newConnectionTimeoutMillis,
newConnectionTTLMillis,
newMaxConnections,
newConnectionAcquisitionTimeoutMillis,
newMaxPendingConnectionAcquires,
newMaxRetries,
newThrottleRetries,
newPathStyleAccess,
Expand Down Expand Up @@ -461,6 +566,12 @@ static S3ClientSettings getClientSettings(final Settings settings, final String
getConfigValue(settings, clientName, ENDPOINT_SETTING),
awsProtocol,
Math.toIntExact(getConfigValue(settings, clientName, READ_TIMEOUT_SETTING).millis()),
Math.toIntExact(getConfigValue(settings, clientName, REQUEST_TIMEOUT_SETTING).millis()),
Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_TIMEOUT_SETTING).millis()),
Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_TTL_SETTING).millis()),
Math.toIntExact(getConfigValue(settings, clientName, MAX_CONNECTIONS_SETTING)),
Math.toIntExact(getConfigValue(settings, clientName, CONNECTION_ACQUISITION_TIMEOUT).millis()),
Math.toIntExact(getConfigValue(settings, clientName, MAX_PENDING_CONNECTION_ACQUIRES)),
getConfigValue(settings, clientName, MAX_RETRIES_SETTING),
getConfigValue(settings, clientName, USE_THROTTLE_RETRIES_SETTING),
getConfigValue(settings, clientName, USE_PATH_STYLE_ACCESS),
Expand Down Expand Up @@ -530,6 +641,12 @@ public boolean equals(final Object o) {
}
final S3ClientSettings that = (S3ClientSettings) o;
return readTimeoutMillis == that.readTimeoutMillis
&& requestTimeoutMillis == that.requestTimeoutMillis
&& connectionTimeoutMillis == that.connectionTimeoutMillis
&& connectionTTLMillis == that.connectionTTLMillis
&& maxConnections == that.maxConnections
&& connectionAcquisitionTimeoutMillis == that.connectionAcquisitionTimeoutMillis
&& maxPendingConnectionAcquires == that.maxPendingConnectionAcquires
&& maxRetries == that.maxRetries
&& throttleRetries == that.throttleRetries
&& Objects.equals(credentials, that.credentials)
Expand All @@ -550,6 +667,12 @@ public int hashCode() {
protocol,
proxySettings,
readTimeoutMillis,
requestTimeoutMillis,
connectionTimeoutMillis,
connectionTTLMillis,
maxConnections,
connectionAcquisitionTimeoutMillis,
maxPendingConnectionAcquires,
maxRetries,
throttleRetries,
disableChunkedEncoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* {@link SocketPermission} 'connect' to establish connections. This class wraps the operations requiring access in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
final class SocketAccess {
public final class SocketAccess {

private SocketAccess() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3.async;

import java.util.concurrent.ExecutorService;

/**
* An encapsulation for the {@link TransferNIOGroup}, and the stream reader and future completion executor services
*/
public class AsyncExecutorBuilder {

private final ExecutorService futureCompletionExecutor;
private final ExecutorService streamReader;
private final TransferNIOGroup transferNIOGroup;

/**
* Construct a new AsyncExecutorBuilder object
*
* @param futureCompletionExecutor An {@link ExecutorService} to pass to {@link software.amazon.awssdk.services.s3.S3AsyncClient} for future completion
* @param streamReader An {@link ExecutorService} to read streams for upload
* @param transferNIOGroup A {@link TransferNIOGroup} which encapsulates the netty {@link io.netty.channel.EventLoopGroup} for async uploads
*/
public AsyncExecutorBuilder(ExecutorService futureCompletionExecutor, ExecutorService streamReader, TransferNIOGroup transferNIOGroup) {
this.transferNIOGroup = transferNIOGroup;
this.streamReader = streamReader;
this.futureCompletionExecutor = futureCompletionExecutor;
}

public ExecutorService getFutureCompletionExecutor() {
return futureCompletionExecutor;
}

public TransferNIOGroup getTransferNIOGroup() {
return transferNIOGroup;
}

public ExecutorService getStreamReader() {
return streamReader;
}
}
Loading

0 comments on commit 6c12571

Please sign in to comment.