From cbe82d269f3af1cacb9360784e0772586dae7b5e Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 4 Nov 2014 18:33:42 +0100 Subject: [PATCH] Add retry logic for S3 connection errors when restoring snapshots This commit adds a retry logic when reading blobs from S3. It also adds a retry logic when initializing a multipart upload and sets the internal "max retries" parameter of the Amazon S3 client with the same value as the "max_retry" parameter set for the snapshot repository (so in worst cases with the default value set to 3, 3x3=9 attempts will be made). The internal S3 client uses an exponential back off strategy between each connection exception (mainly IOException). Closes elasticsearch/elasticsearch#8280 --- .../elasticsearch/cloud/aws/AwsS3Service.java | 2 + .../cloud/aws/InternalAwsS3Service.java | 18 ++++++-- .../aws/blobstore/DefaultS3OutputStream.java | 42 +++++++++++-------- .../cloud/aws/blobstore/S3BlobContainer.java | 27 ++++++++---- .../cloud/aws/blobstore/S3BlobStore.java | 22 ++++++---- .../repositories/s3/S3Repository.java | 8 +++- 6 files changed, 80 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java index af014767..fb01a0b9 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java @@ -29,4 +29,6 @@ public interface AwsS3Service extends LifecycleComponent { AmazonS3 client(); AmazonS3 client(String region, String account, String key); + + AmazonS3 client(String region, String account, String key, Integer maxRetries); } diff --git a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java index 75efa160..a5828e40 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java @@ -60,11 +60,16 @@ public synchronized AmazonS3 client() { String account = componentSettings.get("access_key", settings.get("cloud.account")); String key = componentSettings.get("secret_key", settings.get("cloud.key")); - return getClient(endpoint, account, key); + return getClient(endpoint, account, key, null); } @Override - public synchronized AmazonS3 client(String region, String account, String key) { + public AmazonS3 client(String region, String account, String key) { + return client(region, account, key, null); + } + + @Override + public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) { String endpoint; if (region == null) { endpoint = getDefaultEndpoint(); @@ -77,11 +82,11 @@ public synchronized AmazonS3 client(String region, String account, String key) { key = componentSettings.get("secret_key", settings.get("cloud.key")); } - return getClient(endpoint, account, key); + return getClient(endpoint, account, key, maxRetries); } - private synchronized AmazonS3 getClient(String endpoint, String account, String key) { + private synchronized AmazonS3 getClient(String endpoint, String account, String key, Integer maxRetries) { Tuple clientDescriptor = new Tuple(endpoint, account); AmazonS3Client client = clients.get(clientDescriptor); if (client != null) { @@ -111,6 +116,11 @@ private synchronized AmazonS3 getClient(String endpoint, String account, String clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort); } + if (maxRetries != null) { + // If not explicitly set, default to 3 with exponential backoff policy + clientConfiguration.setMaxErrorRetry(maxRetries); + } + AWSCredentialsProvider credentials; if (account == null && key == null) { diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java index e9c73f78..04d30f2b 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/DefaultS3OutputStream.java @@ -19,6 +19,7 @@ package org.elasticsearch.cloud.aws.blobstore; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.*; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -96,12 +97,12 @@ public void flush(byte[] bytes, int off, int len, boolean closing) throws IOExce private void upload(byte[] bytes, int off, int len) throws IOException { try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { int retry = 0; - while (retry < getNumberOfRetries()) { + while (retry <= getNumberOfRetries()) { try { doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption()); break; - } catch (AmazonS3Exception e) { - if (shouldRetry(e)) { + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { is.reset(); retry++; } else { @@ -123,11 +124,20 @@ protected void doUpload(S3BlobStore blobStore, String bucketName, String blobNam } private void initializeMultipart() { - if (multipartId == null) { - multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); - if (multipartId != null) { - multipartChunks = 1; - multiparts = new ArrayList<>(); + int retry = 0; + while ((retry <= getNumberOfRetries()) && (multipartId == null)) { + try { + multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); + if (multipartId != null) { + multipartChunks = 1; + multiparts = new ArrayList<>(); + } + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { + retry++; + } else { + throw e; + } } } } @@ -145,14 +155,14 @@ protected String doInitialize(S3BlobStore blobStore, String bucketName, String b private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException { try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { int retry = 0; - while (retry < getNumberOfRetries()) { + while (retry <= getNumberOfRetries()) { try { PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart); multiparts.add(partETag); multipartChunks++; return; - } catch (AmazonS3Exception e) { - if (shouldRetry(e) && retry < getNumberOfRetries()) { + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { is.reset(); retry++; } else { @@ -182,13 +192,13 @@ protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, S private void completeMultipart() { int retry = 0; - while (retry < getNumberOfRetries()) { + while (retry <= getNumberOfRetries()) { try { doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts); multipartId = null; return; - } catch (AmazonS3Exception e) { - if (shouldRetry(e) && retry < getNumberOfRetries()) { + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { retry++; } else { abortMultipart(); @@ -218,8 +228,4 @@ protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String throws AmazonS3Exception { blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId)); } - - protected boolean shouldRetry(AmazonS3Exception e) { - return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode()); - } } diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index d5a10079..070a35bb 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -19,6 +19,7 @@ package org.elasticsearch.cloud.aws.blobstore; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3Object; @@ -75,20 +76,30 @@ public boolean deleteBlob(String blobName) throws IOException { @Override public InputStream openInput(String blobName) throws IOException { - try { - S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); - return s3Object.getObjectContent(); - } catch (AmazonS3Exception e) { - if (e.getStatusCode() == 404) { - throw new FileNotFoundException(e.getMessage()); + int retry = 0; + while (retry <= blobStore.numberOfRetries()) { + try { + S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); + return s3Object.getObjectContent(); + } catch (AmazonClientException e) { + if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) { + retry++; + } else { + if (e instanceof AmazonS3Exception) { + if (404 == ((AmazonS3Exception) e).getStatusCode()) { + throw new FileNotFoundException(e.getMessage()); + } + } + throw e; + } } - throw e; } + throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]"); } @Override public OutputStream createOutput(final String blobName) throws IOException { - // UploadS3OutputStream does buffering internally + // UploadS3OutputStream does buffering & retry logic internally return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption()); } diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java index 10ce6c73..fcb9e67f 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java @@ -19,7 +19,9 @@ package org.elasticsearch.cloud.aws.blobstore; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.ObjectListing; @@ -55,12 +57,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { private final int numberOfRetries; - - public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) { - this(settings, client, bucket, region, serverSideEncryption, null); - } - - public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) { + public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, + ByteSizeValue bufferSize, int maxRetries) { super(settings); this.client = client; this.bucket = bucket; @@ -72,7 +70,7 @@ public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]"); } - this.numberOfRetries = settings.getAsInt("max_retries", 3); + this.numberOfRetries = maxRetries; if (!client.doesBucketExist(bucket)) { if (region != null) { client.createBucket(bucket, region); @@ -152,6 +150,16 @@ public void delete(BlobPath path) { } } + protected boolean shouldRetry(AmazonClientException e) { + if (e instanceof AmazonS3Exception) { + AmazonS3Exception s3e = (AmazonS3Exception)e; + if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) { + return true; + } + } + return e.isRetryable(); + } + @Override public void close() { } diff --git a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 4948364e..f4bcd5bc 100644 --- a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -120,10 +120,14 @@ public S3Repository(RepositoryName name, RepositorySettings repositorySettings, boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false)); ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null)); - logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, chunkSize, serverSideEncryption, bufferSize); - blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize); + Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3)); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); + + logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]", + bucket, region, chunkSize, serverSideEncryption, bufferSize, maxRetries); + + blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries); String basePath = repositorySettings.settings().get("base_path", null); if (Strings.hasLength(basePath)) { BlobPath path = new BlobPath();