Skip to content

Commit

Permalink
Merge pull request googleapis#5 from pongad/retrycodes
Browse files Browse the repository at this point in the history
Programmatic Retry Configuration
  • Loading branch information
pongad committed Feb 10, 2016
2 parents 765879e + 6679c95 commit 24faafe
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 119 deletions.
15 changes: 14 additions & 1 deletion src/main/java/io/gapi/gax/grpc/ApiCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import io.grpc.Channel;
import io.grpc.ExperimentalApi;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import com.google.common.util.concurrent.Futures;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

/**
Expand Down Expand Up @@ -167,6 +169,17 @@ public ApiCallable<RequestT, ResponseT> bind(Channel boundChannel) {
new ChannelBindingCallable<RequestT, ResponseT>(callable, boundChannel));
}

/**
* Creates a callable whose calls raise {@link ApiException}
* instead of the usual {@link io.grpc.StatusRuntimeException}.
* The {@link ApiException} will consider failures with any of the given status codes
* retryable.
*/
public ApiCallable<RequestT, ResponseT> retryableOn(ImmutableSet<Status.Code> retryableCodes) {
return new ApiCallable<RequestT, ResponseT>(
new ExceptionTransformingCallable<>(callable, retryableCodes));
}

/**
* Creates a callable which retries using exponential back-off. Back-off parameters are defined
* by the given {@code retryParams}.
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/gapi/gax/grpc/ApiException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.grpc;

/**
* {@code ApiException} represents an exception thrown during an RPC call.
* It stores information useful for functionalities in {@link ApiCallable}.
*/
public class ApiException extends RuntimeException {
private final boolean retryable;

ApiException(Throwable cause, boolean retryable) {
super(cause);
this.retryable = retryable;
}

/**
* Returns whether the failed request can be retried.
*/
public boolean isRetryable() {
return retryable;
}
}
86 changes: 86 additions & 0 deletions src/main/java/io/gapi/gax/grpc/ExceptionTransformingCallable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.grpc;

import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

/**
* {@code ExceptionTransformingCallable} transforms all {@code Throwable}s
* thrown during a call into an instance of {@link ApiException}.
*/
class ExceptionTransformingCallable<RequestT, ResponseT>
implements FutureCallable<RequestT, ResponseT> {
private final FutureCallable<RequestT, ResponseT> callable;
private final ImmutableSet<Status.Code> retryableCodes;

ExceptionTransformingCallable(
FutureCallable<RequestT, ResponseT> callable, ImmutableSet<Status.Code> retryableCodes) {
this.callable = Preconditions.checkNotNull(callable);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
}

public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
SettableFuture<ResponseT> result = SettableFuture.<ResponseT>create();
ListenableFuture<ResponseT> innerCall = callable.futureCall(context);
Futures.addCallback(
innerCall,
new FutureCallback<ResponseT>() {
@Override
public void onSuccess(ResponseT r) {
result.set(r);
}

@Override
public void onFailure(Throwable throwable) {
boolean canRetry = false;
if (throwable instanceof StatusException) {
StatusException e = (StatusException) throwable;
canRetry = retryableCodes.contains(e.getStatus().getCode());
} else if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException e = (StatusRuntimeException) throwable;
canRetry = retryableCodes.contains(e.getStatus().getCode());
}
result.setException(new ApiException(throwable, canRetry));
}
});
return result;
}
}
21 changes: 8 additions & 13 deletions src/main/java/io/gapi/gax/grpc/RetryingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

package io.gapi.gax.grpc;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

Expand All @@ -56,15 +57,13 @@ class RetryingCallable<RequestT, ResponseT> implements FutureCallable<RequestT,
private static final int THREAD_POOL_SIZE = 10;
private static final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(THREAD_POOL_SIZE);
private static final Set<Status> retryableStatuses =
ImmutableSet.of(Status.DEADLINE_EXCEEDED, Status.UNAVAILABLE);

private final FutureCallable<RequestT, ResponseT> callable;
private final RetryParams retryParams;

RetryingCallable(FutureCallable<RequestT, ResponseT> callable, RetryParams retryParams) {
this.callable = callable;
this.retryParams = retryParams;
this.callable = Preconditions.checkNotNull(callable);
this.retryParams = Preconditions.checkNotNull(retryParams);
}

public ListenableFuture<ResponseT> futureCall(CallContext<RequestT> context) {
Expand Down Expand Up @@ -171,14 +170,10 @@ private static <T> CallContext<T> getCallContextWithDeadlineAfter(
}

private static boolean canRetry(Throwable throwable) {
if (throwable instanceof StatusException) {
StatusException e = (StatusException) throwable;
return retryableStatuses.contains(e.getStatus());
}
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException e = (StatusRuntimeException) throwable;
return retryableStatuses.contains(e.getStatus());
if (!(throwable instanceof ApiException)) {
return false;
}
return false;
ApiException apiException = (ApiException) throwable;
return apiException.isRetryable();
}
}
116 changes: 51 additions & 65 deletions src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,94 +32,80 @@
package io.gapi.gax.grpc;

import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import io.grpc.ManagedChannel;
import io.grpc.Status;

import javax.annotation.Nullable;

// TODO(pongad): Don't close the channel if the user gives one to us
/**
* A settings class to configure a service api class.
*
* A note on channels: whichever service API class that this instance of ServiceApiSettings
* is passed to will call shutdown() on the channel provided by {@link getChannel}.
* Setting a channel is intended for use by unit tests to override the channel,
* and should not be used in production.
*/
public class ServiceApiSettings {
private boolean isIdempotentRetrying;

private Credentials credentials;

private String serviceAddress;
private int port;

private ManagedChannel channel;

public ServiceApiSettings() {
isIdempotentRetrying = true;
credentials = null;
serviceAddress = null;
port = 0;
}

@AutoValue
public abstract class ServiceApiSettings<MethodId> {
/**
* Set to true in order to have the service retry all idempotent methods,
* set to false otherwise. The default is true. This setting generally translates to
* doing retries for calls which perform gets, deletes, and updates, but not calls which
* perform creates.
* Status codes that are considered to be retryable by the given methods
*/
public ServiceApiSettings setIsIdempotentRetrying(boolean isIdempotentRetrying) {
this.isIdempotentRetrying = isIdempotentRetrying;
return this;
}

public boolean getIsIdempotentRetrying() {
return isIdempotentRetrying;
}
public abstract ImmutableMap<MethodId, ImmutableSet<Status.Code>> getRetryableCodes();

/**
* Sets the credentials to use in order to call the service. The default is to acquire
* the credentials using GoogleCredentials.getApplicationDefault(). These credentials
* will not be used if the channel is set.
* Credentials to use in order to call the service.
* The default is to acquire credentials using GoogleCredentials.getApplicationDefault().
* These credentials are not used if the channel is set.
*/
public ServiceApiSettings setCredentials(Credentials credentials) {
this.credentials = credentials;
return this;
}

public Credentials getCredentials() {
return credentials;
}
@Nullable
public abstract Credentials getCredentials();

/**
* The path used to reach the service. This value will not be used if the channel is set.
*/
public ServiceApiSettings setServiceAddress(String serviceAddress) {
this.serviceAddress = serviceAddress;
return this;
}

public String getServiceAddress() {
return serviceAddress;
}
@Nullable
public abstract String getServiceAddress();

/**
* The port used to reach the service. This value will not be used if the channel is set.
*/
public ServiceApiSettings setPort(int port) {
this.port = port;
return this;
}

public int getPort() {
return port;
}
public abstract int getPort();

/**
* The channel used to send requests to the service. Whichever service api class that
* this instance of ServiceApiSettings is passed to will call shutdown() on this
* channel. This injection mechanism is intended for use by unit tests to override
* the channel that would be created by default for real calls to the service.
* The channel used to send requests to the service.
* See class documentation on channels.
*/
public ServiceApiSettings setChannel(ManagedChannel channel) {
this.channel = channel;
return this;
@Nullable
public abstract ManagedChannel getChannel();

public static <MethodId> Builder<MethodId> builder() {
return new AutoValue_ServiceApiSettings.Builder()
.setRetryableCodes(ImmutableMap.<MethodId, ImmutableSet<Status.Code>>of())
.setPort(0);
}

public ManagedChannel getChannel() {
return channel;
public Builder<MethodId> toBuilder() {
return new AutoValue_ServiceApiSettings.Builder(this);
}

@AutoValue.Builder
public abstract static class Builder<MethodId> {
public abstract Builder<MethodId> setRetryableCodes(
ImmutableMap<MethodId, ImmutableSet<Status.Code>> codes);

public abstract Builder<MethodId> setCredentials(Credentials credentials);

public abstract Builder<MethodId> setServiceAddress(String serviceAddress);

public abstract Builder<MethodId> setPort(int port);

public abstract Builder<MethodId> setChannel(ManagedChannel channel);

public abstract ServiceApiSettings<MethodId> build();
}
}
Loading

0 comments on commit 24faafe

Please sign in to comment.