Skip to content

Commit

Permalink
Implement retry and timeout policy for gRPC client.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed Jul 14, 2023
1 parent 76aec01 commit 75bbd77
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 63 deletions.
26 changes: 21 additions & 5 deletions sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
Expand All @@ -31,6 +33,7 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.sql.Time;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand All @@ -39,10 +42,20 @@
*/
class DaprGrpcClient implements DaprClient {

/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;

/**
* Internal constructor.
Expand All @@ -51,6 +64,8 @@ class DaprGrpcClient implements DaprClient {
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy();
this.retryPolicy = new RetryPolicy();
}

/**
Expand Down Expand Up @@ -78,14 +93,14 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Expand Down Expand Up @@ -114,7 +129,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
1 change: 0 additions & 1 deletion sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.net.URI;

/**
Expand Down
29 changes: 22 additions & 7 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
Expand Down Expand Up @@ -92,10 +94,20 @@ public class DaprClientGrpc extends AbstractDaprClient {
*/
private final GrpcChannelFacade channel;

/**
* The timeout policy.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* The retry policy.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub asyncStub;
private final DaprGrpc.DaprStub asyncStub;

/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
Expand All @@ -114,6 +126,8 @@ public class DaprClientGrpc extends AbstractDaprClient {
super(objectSerializer, stateSerializer);
this.channel = channel;
this.asyncStub = intercept(asyncStub);
this.timeoutPolicy = new TimeoutPolicy();
this.retryPolicy = new RetryPolicy();
}

private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
Expand Down Expand Up @@ -994,22 +1008,21 @@ private ConfigurationItem buildConfigurationItem(
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
}

super.start(responseListener, metadata);
}
};
Expand All @@ -1030,11 +1043,13 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
26 changes: 26 additions & 0 deletions sdk/src/main/java/io/dapr/config/Properties.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public class Properties {
*/
private static final Integer DEFAULT_GRPC_PORT = 50001;

/**
* Dapr's default max retries.
*/
private static final Integer DEFAULT_API_MAX_RETRIES = 0;

/**
* Dapr's default timeout in seconds.
*/
private static final Integer DEFAULT_API_TIMEOUT_SECONDS = 10;

/**
* Dapr's default use of gRPC or HTTP.
*/
Expand Down Expand Up @@ -115,6 +125,22 @@ public class Properties {
"DAPR_HTTP_ENDPOINT",
null);

/**
* Maximum number of retries for retriable exceptions.
*/
public static final Property<Integer> MAX_RETRIES = new IntegerProperty(
"dapr.api.maxRetries",
"DAPR_API_MAX_RETRIES",
DEFAULT_API_MAX_RETRIES);

/**
* Timeout for API calls.
*/
public static final Property<Integer> TIMEOUT_SECONDS = new IntegerProperty(
"dapr.api.timeoutSeconds",
"DAPR_API_TIMEOUT_SECONDS",
DEFAULT_API_TIMEOUT_SECONDS);

/**
* Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car.
* @deprecated This attribute will be deleted at SDK version 1.10.
Expand Down
109 changes: 109 additions & 0 deletions sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.internal.resiliency;

import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

/**
* Retriable policy for SDK communication to Dapr API.
*/
public final class RetryPolicy {

private static final int MIN_BACKOFF_MILLIS = 500;

private static final int MAX_BACKOFF_SECONDS = 5;

private final int maxRetries = Properties.MAX_RETRIES.get();

/**
* Applies the retry policy to an expected Mono action.
* @param response Response
* @param <T> Type expected for the action's response
* @return action with retry
*/
public <T> Mono<T> apply(Mono<T> response) {
if (this.maxRetries <= 0) {
return response;
}

return response.retryWhen(
reactor.util.retry.Retry.backoff(maxRetries, Duration.ofMillis(MIN_BACKOFF_MILLIS))
.maxBackoff(Duration.ofSeconds(MAX_BACKOFF_SECONDS))
.filter(throwable -> isRetriableGrpcError(throwable)))
.onErrorMap(throwable -> findDaprException(throwable));
}

/**
* Applies the retry policy to an expected Flux action.
* @param response Response
* @param <T> Type expected for the action's response
* @return action with retry
*/
public <T> Flux<T> apply(Flux<T> response) {
if (this.maxRetries <= 0) {
return response;
}

return response.retryWhen(
reactor.util.retry.Retry.backoff(maxRetries, Duration.ofMillis(500))
.maxBackoff(Duration.ofSeconds(5))
.filter(throwable -> isRetriableGrpcError(throwable)))
.onErrorMap(throwable -> findDaprException(throwable));
}

private static boolean isRetriableGrpcError(Throwable throwable) {
Status grpcStatus = findGrpcStatusCode(throwable);
if (grpcStatus == null) {
return false;
}

switch (grpcStatus.getCode()) {
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
return true;
default:
return false;
}
}

private static Status findGrpcStatusCode(Throwable throwable) {
while (throwable != null) {
if (throwable instanceof StatusRuntimeException) {
return ((StatusRuntimeException) throwable).getStatus();
}

throwable = throwable.getCause();
}
return null;
}

private static Throwable findDaprException(Throwable throwable) {
Throwable original = throwable;
while (throwable != null) {
if (throwable instanceof DaprException) {
return throwable;
}

throwable = throwable.getCause();
}
return original;
}
}
40 changes: 40 additions & 0 deletions sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.internal.resiliency;

import io.dapr.config.Properties;
import io.grpc.CallOptions;

import java.util.concurrent.TimeUnit;

/**
* Retriable policy for SDK communication to Dapr API.
*/
public final class TimeoutPolicy {

private final int timeoutSeconds = Properties.TIMEOUT_SECONDS.get();

/**
* Applies the timeout policy to a gRPC call options.
* @param options Call options
* @return Call options with retry policy applied
*/
public CallOptions apply(CallOptions options) {
if (this.timeoutSeconds <= 0) {
return options;
}

return options.withDeadlineAfter(this.timeoutSeconds, TimeUnit.SECONDS);
}
}
Loading

0 comments on commit 75bbd77

Please sign in to comment.