Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes the retry mechanism from the S3 blob store #23952

Merged
merged 4 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -159,5 +160,8 @@ interface CLOUD_S3 {
Setting.timeSetting("cloud.aws.s3.read_timeout", AwsS3Service.READ_TIMEOUT, Property.NodeScope, Property.Deprecated);
}

AmazonS3 client(Settings repositorySettings, Integer maxRetries, boolean useThrottleRetries, Boolean pathStyleAccess);
/**
* Creates an {@code AmazonS3} client from the given repository metadata and node settings.
*/
AmazonS3 client(RepositoryMetaData metadata, Settings repositorySettings);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class DefaultS3OutputStream extends S3OutputStream {
private int multipartChunks;
private List<PartETag> multiparts;

DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption);
DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption);
}

@Override
Expand Down Expand Up @@ -106,19 +106,10 @@ public void flush(byte[] bytes, int off, int len, boolean closing) throws IOExce
*/
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
break;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
}
Expand All @@ -131,10 +122,9 @@ protected void doUpload(S3BlobStore blobStore, String bucketName, String blobNam
}
md.setContentLength(length);

InputStream inputStream = is;

// We try to compute a MD5 while reading it
MessageDigest messageDigest;
InputStream inputStream;
try {
messageDigest = MessageDigest.getInstance("MD5");
inputStream = new DigestInputStream(is, messageDigest);
Expand All @@ -159,20 +149,11 @@ protected void doUpload(S3BlobStore blobStore, String bucketName, String blobNam
}

private void initializeMultipart() {
int retry = 0;
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
try {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
throw e;
}
while (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
}
}
Expand All @@ -193,22 +174,13 @@ protected String doInitialize(S3BlobStore blobStore, String bucketName, String b

private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
abortMultipart();
throw e;
}
}
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}
}
Expand All @@ -230,20 +202,13 @@ protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, S
}

private void completeMultipart() {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
abortMultipart();
throw e;
}
}
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand All @@ -45,6 +46,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import static org.elasticsearch.repositories.s3.S3Repository.getValue;

class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service {

// pkg private for tests
Expand All @@ -60,8 +63,7 @@ class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Se
}

@Override
public synchronized AmazonS3 client(Settings repositorySettings, Integer maxRetries,
boolean useThrottleRetries, Boolean pathStyleAccess) {
public synchronized AmazonS3 client(RepositoryMetaData metadata, Settings repositorySettings) {
String clientName = CLIENT_NAME.get(repositorySettings);
String foundEndpoint = findEndpoint(logger, repositorySettings, settings, clientName);

Expand All @@ -73,6 +75,26 @@ public synchronized AmazonS3 client(Settings repositorySettings, Integer maxRetr
return client;
}

Integer maxRetries = getValue(metadata.settings(), settings,
S3Repository.Repository.MAX_RETRIES_SETTING,
S3Repository.Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(metadata.settings(), settings,
S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING,
S3Repository.Repositories.USE_THROTTLE_RETRIES_SETTING);
// If the user defined a path style access setting, we rely on it,
// otherwise we use the default value set by the SDK
Boolean pathStyleAccess = null;
if (S3Repository.Repository.PATH_STYLE_ACCESS_SETTING.exists(metadata.settings()) ||
S3Repository.Repositories.PATH_STYLE_ACCESS_SETTING.exists(settings)) {
pathStyleAccess = getValue(metadata.settings(), settings,
S3Repository.Repository.PATH_STYLE_ACCESS_SETTING,
S3Repository.Repositories.PATH_STYLE_ACCESS_SETTING);
}

logger.debug("creating S3 client with client_name [{}], endpoint [{}], max_retries [{}], " +
"use_throttle_retries [{}], path_style_access [{}]",
clientName, foundEndpoint, maxRetries, useThrottleRetries, pathStyleAccess);

client = new AmazonS3Client(
credentials,
buildConfiguration(logger, repositorySettings, settings, clientName, maxRetries, foundEndpoint, useThrottleRetries));
Expand Down Expand Up @@ -187,7 +209,7 @@ private static <T> T getConfigValue(Settings repositorySettings, Settings global
// no repository setting, just use global setting
return globalSetting.get(globalSettings);
} else {
return S3Repository.getValue(repositorySettings, globalSettings, repositorySetting, globalSetting);
return getValue(repositorySettings, globalSettings, repositorySetting, globalSetting);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,17 @@ public boolean blobExists(String blobName) {

@Override
public InputStream readBlob(String blobName) throws IOException {
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = SocketAccess.doPrivileged(() -> blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
retry++;
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
try {
S3Object s3Object = SocketAccess.doPrivileged(() -> blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() + "]");
}

@Override
Expand Down Expand Up @@ -120,7 +112,7 @@ public void deleteBlob(String blobName) throws IOException {
private OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName),
blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ class S3BlobStore extends AbstractComponent implements BlobStore {

private final boolean serverSideEncryption;

private final int numberOfRetries;

private final CannedAccessControlList cannedACL;

private final StorageClass storageClass;

S3BlobStore(Settings settings, AmazonS3 client, String bucket, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries, String cannedACL, String storageClass) {
ByteSizeValue bufferSize, String cannedACL, String storageClass) {
super(settings);
this.client = client;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.numberOfRetries = maxRetries;
this.storageClass = initStorageClass(storageClass);

// Note: the method client.doesBucketExist() may return 'true' is the bucket exists
Expand Down Expand Up @@ -102,10 +99,6 @@ public int bufferSizeInBytes() {
return bufferSize.bytesAsInt();
}

public int numberOfRetries() {
return numberOfRetries;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ abstract class S3OutputStream extends OutputStream {
private S3BlobStore blobStore;
private String bucketName;
private String blobName;
private int numberOfRetries;
private boolean serverSideEncryption;

private byte[] buffer;
Expand All @@ -48,11 +47,10 @@ abstract class S3OutputStream extends OutputStream {

private int flushCount = 0;

S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
this.blobStore = blobStore;
this.bucketName = bucketName;
this.blobName = blobName;
this.numberOfRetries = numberOfRetries;
this.serverSideEncryption = serverSideEncryption;

if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) {
Expand Down Expand Up @@ -107,10 +105,6 @@ public int getBufferSize() {
return buffer.length;
}

public int getNumberOfRetries() {
return numberOfRetries;
}

public boolean isServerSideEncryption() {
return serverSideEncryption;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ public interface Repository {

boolean serverSideEncryption = getValue(metadata.settings(), settings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = getValue(metadata.settings(), settings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = getValue(metadata.settings(), settings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(metadata.settings(), settings, Repository.USE_THROTTLE_RETRIES_SETTING, Repositories.USE_THROTTLE_RETRIES_SETTING);
this.chunkSize = getValue(metadata.settings(), settings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(metadata.settings(), settings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);

Expand All @@ -326,21 +324,12 @@ public interface Repository {
String storageClass = getValue(metadata.settings(), settings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(metadata.settings(), settings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);

// If the user defined a path style access setting, we rely on it otherwise we use the default
// value set by the SDK
Boolean pathStyleAccess = null;
if (Repository.PATH_STYLE_ACCESS_SETTING.exists(metadata.settings()) ||
Repositories.PATH_STYLE_ACCESS_SETTING.exists(settings)) {
pathStyleAccess = getValue(metadata.settings(), settings, Repository.PATH_STYLE_ACCESS_SETTING, Repositories.PATH_STYLE_ACCESS_SETTING);
}

logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], max_retries [{}], use_throttle_retries [{}], cannedACL [{}], storageClass [{}], path_style_access [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, maxRetries, useThrottleRetries, cannedACL,
storageClass, pathStyleAccess);
"buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass);

AmazonS3 client = s3Service.client(metadata.settings(), maxRetries, useThrottleRetries, pathStyleAccess);
blobStore = new S3BlobStore(settings, client, bucket, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
AmazonS3 client = s3Service.client(metadata, metadata.settings());
blobStore = new S3BlobStore(settings, client, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);

String basePath = getValue(metadata.settings(), settings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
Expand Down Expand Up @@ -165,6 +166,7 @@ public void testEncryption() {
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000))
.put(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
.put(S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING.getKey(), randomBoolean())
.build();

PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
Expand Down Expand Up @@ -194,7 +196,8 @@ public void testEncryption() {

Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(repositorySettings, null, randomBoolean(), null);
RepositoryMetaData metadata = new RepositoryMetaData("test-repo", "fs", Settings.EMPTY);
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(metadata, repositorySettings);

String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
Expand Down Expand Up @@ -462,7 +465,9 @@ public void cleanRepositoryFiles(String basePath) {
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(Settings.EMPTY, null, randomBoolean(), null);
RepositoryMetaData metadata = new RepositoryMetaData("test-repo", "fs", Settings.EMPTY);
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(metadata,
Settings.builder().put(S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING.getKey(), randomBoolean()).build());
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
Expand Down
Loading