Skip to content

Commit

Permalink
Storage NIO: Add new retry options in CloudStorageConfiguration (#3869)
Browse files Browse the repository at this point in the history
* Put new retry options in CloudStorageConfiguration

Add retriableHttpCodes and reopenableExceptions so the user can, if they
choose, customize the conditions under which we retry an access or
reopen a channel after an exception closed it.

This can be handy if a cloud provider starts transiently throwing
exceptions that would normally indicate a fatal condition, but which in
practice will pass if retried.

The backoff logic in retries / reopens in unmodified.

* Clean up dead code line

* retriable -> retryable (in NIO Cloud Storage code)

Also add a few basic tests to make sure the retryable options do what we
expect.

* Update testListFilesInRootDirectory to match reality

In the built-in UNIX filesystem, the newDirectoryStream method
will return absolute paths if given absolute paths as input,
and relative paths if given relative paths as input.

We're now seeing this too for the NIO Cloud Storage provider
(this is a good thing), so I updated this test to reflect
this new reality.

* remove unused import

* Add boilerplate file header

* Keep old ctor, just @deprecated

* Add back the *other* ctor that needs to be deprecated

* add @deprecated outside of comment
  • Loading branch information
jean-philippe-martin authored and chingor13 committed Nov 27, 2018
1 parent b959329 commit 324085e
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import java.io.EOFException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;

/**
Expand Down Expand Up @@ -89,6 +94,17 @@ public abstract class CloudStorageConfiguration {
*/
public abstract boolean useUserProjectOnlyForRequesterPaysBuckets();

/**
* Returns the set of HTTP error codes that will be retried, in addition to the normally
* retryable ones.
*/
public abstract ImmutableList<Integer> retryableHttpCodes();

/**
* Returns the set of exceptions for which we'll try a channel reopen if maxChannelReopens
* is positive.
*/
public abstract ImmutableList<Class<? extends Exception>> reopenableExceptions();

/**
* Creates a new builder, initialized with the following settings:
Expand Down Expand Up @@ -118,6 +134,10 @@ public static final class Builder {
private @Nullable String userProject = null;
// This of this as "clear userProject if not RequesterPays"
private boolean useUserProjectOnlyForRequesterPaysBuckets = false;
private ImmutableList<Integer> retryableHttpCodes = ImmutableList.of(500, 502, 503);
private ImmutableList<Class<? extends Exception>> reopenableExceptions =
ImmutableList.<Class<? extends Exception>>of(
SSLException.class, EOFException.class, SocketException.class, SocketTimeoutException.class);

/**
* Changes current working directory for new filesystem. This defaults to the root directory.
Expand Down Expand Up @@ -186,6 +206,16 @@ public Builder autoDetectRequesterPays(boolean value) {
return this;
}

public Builder retryableHttpCodes(ImmutableList<Integer> value) {
retryableHttpCodes = value;
return this;
}

public Builder reopenableExceptions(ImmutableList<Class<? extends Exception>> values) {
reopenableExceptions = values;
return this;
}

/**
* Creates new instance without destroying builder.
*/
Expand All @@ -198,7 +228,9 @@ public CloudStorageConfiguration build() {
blockSize,
maxChannelReopens,
userProject,
useUserProjectOnlyForRequesterPaysBuckets);
useUserProjectOnlyForRequesterPaysBuckets,
retryableHttpCodes,
reopenableExceptions);
}

Builder(CloudStorageConfiguration toModify) {
Expand All @@ -210,6 +242,8 @@ public CloudStorageConfiguration build() {
maxChannelReopens = toModify.maxChannelReopens();
userProject = toModify.userProject();
useUserProjectOnlyForRequesterPaysBuckets = toModify.useUserProjectOnlyForRequesterPaysBuckets();
retryableHttpCodes = toModify.retryableHttpCodes();
reopenableExceptions = toModify.reopenableExceptions();
}

Builder() {}
Expand Down Expand Up @@ -250,6 +284,12 @@ static private CloudStorageConfiguration fromMap(Builder builder, Map<String, ?>
case "useUserProjectOnlyForRequesterPaysBuckets":
builder.autoDetectRequesterPays((Boolean) entry.getValue());
break;
case "retryableHttpCodes":
builder.retryableHttpCodes((ImmutableList<Integer>) entry.getValue());
break;
case "reopenableExceptions":
builder.reopenableExceptions((ImmutableList<Class<? extends Exception>>) entry.getValue());
break;
default:
throw new IllegalArgumentException(entry.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private SeekableByteChannel newReadChannel(Path path, Set<? extends OpenOption>
cloudPath.getBlobId(),
0,
maxChannelReopens,
cloudPath.getFileSystem().config(),
userProject,
blobSourceOptions.toArray(new BlobSourceOption[blobSourceOptions.size()]));
}
Expand Down Expand Up @@ -461,7 +462,8 @@ public boolean deleteIfExists(Path path) throws IOException {
throw new CloudStoragePseudoDirectoryException(cloudPath);
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down Expand Up @@ -586,7 +588,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
throw new CloudStoragePseudoDirectoryException(toPath);
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(source));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
fromPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down Expand Up @@ -672,11 +675,12 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException {
}
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
boolean nullId;
if (isNullOrEmpty(userProject)) {
nullId = storage.get(
Expand Down Expand Up @@ -725,11 +729,12 @@ public <A extends BasicFileAttributes> A readAttributes(
}
initStorage();

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
BlobInfo blobInfo = null;
try {
BlobId blobId = cloudPath.getBlobId();
Expand Down Expand Up @@ -811,7 +816,8 @@ public DirectoryStream<Path> newDirectoryStream(final Path dir, final Filter<? s
checkNotNull(filter);
initStorage();

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(dir));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
final int maxRetries;
// open options, we keep them around for reopens.
final BlobSourceOption[] blobSourceOptions;
final CloudStorageConfiguration config;
private ReadChannel channel;
private long position;
private long size;
Expand All @@ -67,6 +68,7 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
/**
* @param maxChannelReopens max number of times to try re-opening the channel if it closes on us
* unexpectedly.
* @param config configuration for what to retry on.
* @param blobSourceOptions BlobSourceOption.userProject if you want to pay the charges (required
* for requester-pays buckets). Note:
* Buckets that have Requester Pays disabled still accept requests that include a billing
Expand All @@ -79,17 +81,18 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
*/
@CheckReturnValue
@SuppressWarnings("resource")
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, @Nullable String userProject, BlobSourceOption... blobSourceOptions)
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, final CloudStorageConfiguration config, @Nullable String userProject, BlobSourceOption... blobSourceOptions)
throws IOException {
return new CloudStorageReadChannel(gcsStorage, file, position, maxChannelReopens, userProject, blobSourceOptions);
return new CloudStorageReadChannel(gcsStorage, file, position, maxChannelReopens, config, userProject, blobSourceOptions);
}

private CloudStorageReadChannel(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, @Nullable String userProject, BlobSourceOption... blobSourceOptions) throws IOException {
private CloudStorageReadChannel(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, final CloudStorageConfiguration config, @Nullable String userProject, BlobSourceOption... blobSourceOptions) throws IOException {
this.gcsStorage = gcsStorage;
this.file = file;
this.position = position;
this.maxChannelReopens = maxChannelReopens;
this.maxRetries = Math.max(3, maxChannelReopens);
this.config = config;
// get the generation, enshrine that in our options
fetchSize(gcsStorage, userProject, file);
List options = Lists.newArrayList(blobSourceOptions);
Expand Down Expand Up @@ -133,7 +136,7 @@ public int read(ByteBuffer dst) throws IOException {
synchronized (this) {
checkOpen();
int amt;
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens, config);
dst.mark();
while (true) {
try {
Expand Down Expand Up @@ -203,7 +206,7 @@ private void checkOpen() throws ClosedChannelException {
}

private long fetchSize(Storage gcsStorage, @Nullable String userProject, BlobId file) throws IOException {
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens, config);

while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
package com.google.cloud.storage.contrib.nio;

import com.google.cloud.storage.StorageException;

import java.io.EOFException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import javax.net.ssl.SSLException;
import com.google.common.annotations.VisibleForTesting;

/**
* Simple counter class to keep track of retry and reopen attempts when StorageExceptions are
Expand All @@ -34,26 +30,62 @@ public class CloudStorageRetryHandler {
private long totalWaitTime; // in milliseconds
private final int maxRetries;
private final int maxReopens;
private final CloudStorageConfiguration config;


/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to the same value.
*
* @param maxRetriesAndReopens value for both maxRetries and maxReopens
*
* @deprecated use CloudStorageRetryHandler(CloudStorageConfiguration) instead.
*/
@java.lang.Deprecated
public CloudStorageRetryHandler(final int maxRetriesAndReopens) {
this.maxRetries = maxRetriesAndReopens;
this.maxReopens = maxRetriesAndReopens;
// we're just using the retry parameters from the config, so it's OK to have a default.
this.config = CloudStorageConfiguration.DEFAULT;
}

/**
/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to different values.
*
* @param maxRetries maximum number of retries
* @param maxReopens maximum number of reopens
*
* @deprecated use CloudStorageRetryHandler(CloudStorageConfiguration) instead.
*/
@java.lang.Deprecated
public CloudStorageRetryHandler(final int maxRetries, final int maxReopens) {
this.maxRetries = maxRetries;
this.maxReopens = maxReopens;
// we're just using the retry parameters from the config, so it's OK to have a default.
this.config = CloudStorageConfiguration.DEFAULT;
}

/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to the same value.
*
* @param config - configuration for reopens and retryable codes.
*/
public CloudStorageRetryHandler(final CloudStorageConfiguration config) {
this.maxRetries = config.maxChannelReopens();
this.maxReopens = config.maxChannelReopens();
this.config = config;
}

/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to different values.
*
* @param maxRetries maximum number of retries
* @param maxReopens maximum number of reopens (overrides what's in the config)
* @param config http codes we'll retry on, and exceptions we'll reopen on.
*/
public CloudStorageRetryHandler(final int maxRetries, final int maxReopens, final CloudStorageConfiguration config) {
this.maxRetries = maxRetries;
this.maxReopens = maxReopens;
this.config = config;
}

/**
Expand Down Expand Up @@ -143,25 +175,36 @@ void sleepForAttempt(int attempt) {
* @param exs StorageException to test
* @return true if exs is a retryable error, otherwise false
*/
private static boolean isRetryable(final StorageException exs) {
return exs.isRetryable() || exs.getCode() == 500 || exs.getCode() == 502 || exs.getCode() == 503;
@VisibleForTesting
boolean isRetryable(final StorageException exs) {
if (exs.isRetryable()) {
return true;
}
for (int code : config.retryableHttpCodes()) {
if (exs.getCode() == code) {
return true;
}
}
return false;
}

/**
* @param exs StorageException to test
* @return true if exs is an error that can be resolved via a channel reopen, otherwise false
*/
private static boolean isReopenable(final StorageException exs) {
@VisibleForTesting
boolean isReopenable(final StorageException exs) {
Throwable throwable = exs;
// ensures finite iteration
int maxDepth = 20;
while (throwable != null && maxDepth-- > 0) {
if ((throwable.getMessage() != null
&& throwable.getMessage().contains("Connection closed prematurely"))
|| throwable instanceof SSLException
|| throwable instanceof EOFException
|| throwable instanceof SocketException
|| throwable instanceof SocketTimeoutException) {
for (Class<? extends Exception> reopenableException : config.reopenableExceptions()) {
if (reopenableException.isInstance(throwable)) {
return true;
}
}
if (throwable.getMessage() != null
&& throwable.getMessage().contains("Connection closed prematurely")) {
return true;
}
throwable = throwable.getCause();
Expand Down
Loading

0 comments on commit 324085e

Please sign in to comment.