Skip to content

Commit

Permalink
Implement retry and timeout policy for gRPC client. (#889)
Browse files Browse the repository at this point in the history
* Implement retry and timeout policy for gRPC client.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix invoke actor after aborted flow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza authored Aug 16, 2023
1 parent 6d65991 commit cf8040d
Show file tree
Hide file tree
Showing 25 changed files with 1,023 additions and 116 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -101,14 +102,20 @@ jobs:
docker stop dapr_placement
cd dapr
./dist/linux_amd64/release/placement &
- name: Install Local kafka using docker-compose
- name: Install local Kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
docker ps
- name: Install Local mongo database using docker-compose
- name: Install local Mongo database using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |
mkdir -p /home/runner/.local/bin
wget -q ${{ env.TOXIPROXY_URL }} -O /home/runner/.local/bin/toxiproxy-server
chmod +x /home/runner/.local/bin/toxiproxy-server
/home/runner/.local/bin/toxiproxy-server --version
- name: Clean up files
run: mvn clean -B
- name: Build sdk
Expand Down
33 changes: 26 additions & 7 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
Expand Down Expand Up @@ -46,26 +47,41 @@ public class ActorClient implements AutoCloseable {
* Instantiates a new channel for Dapr sidecar communication.
*/
public ActorClient() {
this(Properties.API_PROTOCOL.get());
this(null);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(ResiliencyOptions resiliencyOptions) {
this(Properties.API_PROTOCOL.get(), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol) {
this(apiProtocol, buildManagedChannel(apiProtocol));
private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) {
this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param grpcManagedChannel gRPC channel.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) {
private ActorClient(
DaprApiProtocol apiProtocol,
ManagedChannel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel);
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions);
}

/**
Expand Down Expand Up @@ -119,9 +135,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
private static DaprClient buildDaprClient(
DaprApiProtocol apiProtocol,
Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
case HTTP: {
LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10.");
return new DaprHttpClient(new DaprHttpBuilder().build());
Expand Down
31 changes: 25 additions & 6 deletions sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
Expand All @@ -39,18 +42,33 @@
*/
class DaprGrpcClient implements DaprClient {

/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional)
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}

/**
Expand Down Expand Up @@ -78,14 +96,14 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Expand Down Expand Up @@ -114,7 +132,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
33 changes: 17 additions & 16 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Represents the base class for actors.
Expand All @@ -28,8 +29,6 @@
*/
public abstract class AbstractActor {

private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();

/**
* Type of tracing messages.
*/
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class AbstractActor {
/**
* Internal control to assert method invocation on start and finish in this SDK.
*/
private boolean started;
private final AtomicBoolean started;

/**
* Instantiates a new Actor.
Expand All @@ -74,7 +73,7 @@ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) {
runtimeContext.getActorTypeInformation().getName(),
id);
this.actorTrace = runtimeContext.getActorTrace();
this.started = false;
this.started = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -250,14 +249,16 @@ protected Mono<Void> saveState() {

/**
* Resets the cached state of this Actor.
*
* @param force Forces the rollback, even if not in a call.
*/
void rollback() {
if (!this.started) {
void rollback(boolean force) {
if (!force && !this.started.get()) {
throw new IllegalStateException("Cannot reset state before starting call.");
}

this.resetState();
this.started = false;
this.started.set(false);
}

/**
Expand Down Expand Up @@ -302,11 +303,12 @@ Mono<Void> onDeactivateInternal() {
*/
Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (this.started) {
throw new IllegalStateException("Cannot invoke a method before completing previous call.");
if (this.started.get()) {
throw new IllegalStateException(
"Cannot invoke a method before completing previous call. " + getId().toString());
}

this.started = true;
this.started.set(true);
}).then(this.onPreActorMethod(actorMethodContext));
}

Expand All @@ -318,14 +320,13 @@ Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
*/
Mono<Void> onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (!this.started) {
if (!this.started.get()) {
throw new IllegalStateException("Cannot complete a method before starting a call.");
}
}).then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> {
this.started = false;
}));
})
.then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> this.started.set(false)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,16 @@ private <T> Mono<T> invoke(ActorId actorId, ActorMethodContext context, Function
this.runtimeContext.getActorTypeInformation().getName()));
}

return actor.onPreActorMethodInternal(context)
return Mono.fromRunnable(() -> actor.rollback(true))
.onErrorMap(throwable -> {
actor.rollback(false);
return throwable;
})
.then(actor.onPreActorMethodInternal(context))
.then((Mono<Object>) func.apply(actor))
.switchIfEmpty(
actor.onPostActorMethodInternal(context))
.flatMap(r -> actor.onPostActorMethodInternal(context).thenReturn(r))
.onErrorMap(throwable -> {
actor.rollback();
return throwable;
})
.map(o -> (T) o);
} catch (Exception e) {
return Mono.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprGrpcClient(DaprGrpc.newStub(channel), null);
}

@Test
Expand Down
5 changes: 4 additions & 1 deletion sdk-tests/components/mongo-statestore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ spec:
- name: databaseName
value: local
- name: collectionName
value: testCollection
value: testCollection
scopes:
- grpcstateclientit
- httpstateclientit
5 changes: 5 additions & 0 deletions sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@
<version>1.3.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 7 additions & 2 deletions sdk-tests/src/test/java/io/dapr/it/BaseIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.actors.client.ActorClient;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.resiliency.ResiliencyOptions;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;

Expand Down Expand Up @@ -194,8 +195,12 @@ public static void cleanUp() throws Exception {
}
}

protected ActorClient newActorClient() {
ActorClient client = new ActorClient();
protected static ActorClient newActorClient() {
return newActorClient(null);
}

protected static ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
ActorClient client = new ActorClient(resiliencyOptions);
TO_BE_CLOSED.add(client);
return client;
}
Expand Down
18 changes: 18 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,25 @@ public void run() throws InterruptedException, IOException {
}
});

final Thread stderrReader = new Thread(() -> {
try {
try (InputStream stderr = this.process.getErrorStream()) {
try (InputStreamReader isr = new InputStreamReader(stderr)) {
try (BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
System.err.println(line);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});

stdoutReader.start();
stderrReader.start();
// Waits for success to happen within 1 minute.
finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!success.get()) {
Expand Down
Loading

0 comments on commit cf8040d

Please sign in to comment.