diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 31013c131..a64b2ac7a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -48,6 +48,7 @@ jobs: DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0-rc.1/install/install.sh DAPR_CLI_REF: DAPR_REF: + TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64 steps: - uses: actions/checkout@v3 - name: Set up OpenJDK ${{ env.JDK_VER }} @@ -101,14 +102,20 @@ jobs: docker stop dapr_placement cd dapr ./dist/linux_amd64/release/placement & - - name: Install Local kafka using docker-compose + - name: Install local Kafka using docker-compose run: | docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d docker ps - - name: Install Local mongo database using docker-compose + - name: Install local Mongo database using docker-compose run: | docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d docker ps + - name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar + run: | + mkdir -p /home/runner/.local/bin + wget -q ${{ env.TOXIPROXY_URL }} -O /home/runner/.local/bin/toxiproxy-server + chmod +x /home/runner/.local/bin/toxiproxy-server + /home/runner/.local/bin/toxiproxy-server --version - name: Clean up files run: mvn clean -B - name: Build sdk diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index 80d86d013..1efa44646 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -15,6 +15,7 @@ import io.dapr.client.DaprApiProtocol; import io.dapr.client.DaprHttpBuilder; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.utils.Version; import io.dapr.v1.DaprGrpc; @@ -46,26 +47,41 @@ public class ActorClient implements AutoCloseable { * Instantiates a new channel for Dapr sidecar communication. */ public ActorClient() { - this(Properties.API_PROTOCOL.get()); + this(null); + } + + /** + * Instantiates a new channel for Dapr sidecar communication. + * + * @param resiliencyOptions Client resiliency options. + */ + public ActorClient(ResiliencyOptions resiliencyOptions) { + this(Properties.API_PROTOCOL.get(), resiliencyOptions); } /** * Instantiates a new channel for Dapr sidecar communication. * * @param apiProtocol Dapr's API protocol. + * @param resiliencyOptions Client resiliency options. */ - private ActorClient(DaprApiProtocol apiProtocol) { - this(apiProtocol, buildManagedChannel(apiProtocol)); + private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) { + this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions); } /** * Instantiates a new channel for Dapr sidecar communication. * * @param apiProtocol Dapr's API protocol. + * @param grpcManagedChannel gRPC channel. + * @param resiliencyOptions Client resiliency options. */ - private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) { + private ActorClient( + DaprApiProtocol apiProtocol, + ManagedChannel grpcManagedChannel, + ResiliencyOptions resiliencyOptions) { this.grpcManagedChannel = grpcManagedChannel; - this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel); + this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions); } /** @@ -119,9 +135,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) { * @return an instance of the setup Client * @throws java.lang.IllegalStateException if any required field is missing */ - private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) { + private static DaprClient buildDaprClient( + DaprApiProtocol apiProtocol, + Channel grpcManagedChannel, + ResiliencyOptions resiliencyOptions) { switch (apiProtocol) { - case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel)); + case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions); case HTTP: { LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10."); return new DaprHttpClient(new DaprHttpBuilder().build()); diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index 844d2ace3..efe37b0be 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -14,9 +14,12 @@ package io.dapr.actors.client; import com.google.protobuf.ByteString; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; +import io.dapr.internal.resiliency.RetryPolicy; +import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.CallOptions; @@ -39,18 +42,33 @@ */ class DaprGrpcClient implements DaprClient { + /** + * Timeout policy for SDK calls to Dapr API. + */ + private final TimeoutPolicy timeoutPolicy; + + /** + * Retry policy for SDK calls to Dapr API. + */ + private final RetryPolicy retryPolicy; + /** * The async gRPC stub. */ - private DaprGrpc.DaprStub client; + private final DaprGrpc.DaprStub client; /** * Internal constructor. * * @param grpcClient Dapr's GRPC client. + * @param resiliencyOptions Client resiliency options (optional) */ - DaprGrpcClient(DaprGrpc.DaprStub grpcClient) { + DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) { this.client = intercept(grpcClient); + this.timeoutPolicy = new TimeoutPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + this.retryPolicy = new RetryPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } /** @@ -78,14 +96,14 @@ public Mono invoke(String actorType, String actorId, String methodName, * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { + private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, - CallOptions callOptions, + CallOptions options, Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { @@ -114,7 +132,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu } private Mono createMono(Consumer> consumer) { - return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private StreamObserver createStreamObserver(MonoSink sink) { diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java index 9febda8f9..1fe0deda4 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; /** * Represents the base class for actors. @@ -28,8 +29,6 @@ */ public abstract class AbstractActor { - private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer(); - /** * Type of tracing messages. */ @@ -58,7 +57,7 @@ public abstract class AbstractActor { /** * Internal control to assert method invocation on start and finish in this SDK. */ - private boolean started; + private final AtomicBoolean started; /** * Instantiates a new Actor. @@ -74,7 +73,7 @@ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) { runtimeContext.getActorTypeInformation().getName(), id); this.actorTrace = runtimeContext.getActorTrace(); - this.started = false; + this.started = new AtomicBoolean(false); } /** @@ -250,14 +249,16 @@ protected Mono saveState() { /** * Resets the cached state of this Actor. + * + * @param force Forces the rollback, even if not in a call. */ - void rollback() { - if (!this.started) { + void rollback(boolean force) { + if (!force && !this.started.get()) { throw new IllegalStateException("Cannot reset state before starting call."); } this.resetState(); - this.started = false; + this.started.set(false); } /** @@ -302,11 +303,12 @@ Mono onDeactivateInternal() { */ Mono onPreActorMethodInternal(ActorMethodContext actorMethodContext) { return Mono.fromRunnable(() -> { - if (this.started) { - throw new IllegalStateException("Cannot invoke a method before completing previous call."); + if (this.started.get()) { + throw new IllegalStateException( + "Cannot invoke a method before completing previous call. " + getId().toString()); } - this.started = true; + this.started.set(true); }).then(this.onPreActorMethod(actorMethodContext)); } @@ -318,14 +320,13 @@ Mono onPreActorMethodInternal(ActorMethodContext actorMethodContext) { */ Mono onPostActorMethodInternal(ActorMethodContext actorMethodContext) { return Mono.fromRunnable(() -> { - if (!this.started) { + if (!this.started.get()) { throw new IllegalStateException("Cannot complete a method before starting a call."); } - }).then(this.onPostActorMethod(actorMethodContext)) - .then(this.saveState()) - .then(Mono.fromRunnable(() -> { - this.started = false; - })); + }) + .then(this.onPostActorMethod(actorMethodContext)) + .then(this.saveState()) + .then(Mono.fromRunnable(() -> this.started.set(false))); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java index e9a11c02f..c51e6e8bd 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java @@ -306,15 +306,16 @@ private Mono invoke(ActorId actorId, ActorMethodContext context, Function this.runtimeContext.getActorTypeInformation().getName())); } - return actor.onPreActorMethodInternal(context) + return Mono.fromRunnable(() -> actor.rollback(true)) + .onErrorMap(throwable -> { + actor.rollback(false); + return throwable; + }) + .then(actor.onPreActorMethodInternal(context)) .then((Mono) func.apply(actor)) .switchIfEmpty( actor.onPostActorMethodInternal(context)) .flatMap(r -> actor.onPostActorMethodInternal(context).thenReturn(r)) - .onErrorMap(throwable -> { - actor.rollback(); - return throwable; - }) .map(o -> (T) o); } catch (Exception e) { return Mono.error(e); diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index f3d42c8c2..71360dc16 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -105,7 +105,7 @@ public void setup() throws IOException { InProcessChannelBuilder.forName(serverName).directExecutor().build()); // Create a HelloWorldClient using the in-process channel; - client = new DaprGrpcClient(DaprGrpc.newStub(channel)); + client = new DaprGrpcClient(DaprGrpc.newStub(channel), null); } @Test diff --git a/sdk-tests/components/mongo-statestore.yml b/sdk-tests/components/mongo-statestore.yml index ace53887a..34ecb0b89 100644 --- a/sdk-tests/components/mongo-statestore.yml +++ b/sdk-tests/components/mongo-statestore.yml @@ -11,4 +11,7 @@ spec: - name: databaseName value: local - name: collectionName - value: testCollection \ No newline at end of file + value: testCollection +scopes: + - grpcstateclientit + - httpstateclientit \ No newline at end of file diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 25b96f39f..7238dfef6 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -146,6 +146,11 @@ 1.3.5 compile + + eu.rekawek.toxiproxy + toxiproxy-java + 2.1.7 + diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index b35d1ce6b..f8dc006be 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -15,6 +15,7 @@ import io.dapr.actors.client.ActorClient; import io.dapr.client.DaprApiProtocol; +import io.dapr.client.resiliency.ResiliencyOptions; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.AfterClass; @@ -194,8 +195,12 @@ public static void cleanUp() throws Exception { } } - protected ActorClient newActorClient() { - ActorClient client = new ActorClient(); + protected static ActorClient newActorClient() { + return newActorClient(null); + } + + protected static ActorClient newActorClient(ResiliencyOptions resiliencyOptions) { + ActorClient client = new ActorClient(resiliencyOptions); TO_BE_CLOSED.add(client); return client; } diff --git a/sdk-tests/src/test/java/io/dapr/it/Command.java b/sdk-tests/src/test/java/io/dapr/it/Command.java index 156b395f4..74f550893 100644 --- a/sdk-tests/src/test/java/io/dapr/it/Command.java +++ b/sdk-tests/src/test/java/io/dapr/it/Command.java @@ -83,7 +83,25 @@ public void run() throws InterruptedException, IOException { } }); + final Thread stderrReader = new Thread(() -> { + try { + try (InputStream stderr = this.process.getErrorStream()) { + try (InputStreamReader isr = new InputStreamReader(stderr)) { + try (BufferedReader br = new BufferedReader(isr)) { + String line; + while ((line = br.readLine()) != null) { + System.err.println(line); + } + } + } + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + stdoutReader.start(); + stderrReader.start(); // Waits for success to happen within 1 minute. finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES); if (!success.get()) { diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java index be81cb745..5a24601b1 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java @@ -13,6 +13,8 @@ package io.dapr.it; +import io.dapr.config.Properties; + import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; @@ -46,6 +48,20 @@ public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPor } } + public void use() { + if (this.httpPort != null) { + System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.httpPort)); + System.getProperties().setProperty( + Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.httpPort); + } + + if (this.grpcPort != null) { + System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort)); + System.getProperties().setProperty( + Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort); + } + } + public Integer getGrpcPort() { return grpcPort; } diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 36ce77182..33beeaa07 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -30,7 +30,7 @@ public class DaprRun implements Stoppable { private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " + "--config ./configurations/configuration.yaml " + - "--components-path ./components"; + "--resources-path ./components"; // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = @@ -130,20 +130,11 @@ public void stop() throws InterruptedException, IOException { } public void use() { - if (this.ports.getHttpPort() != null) { - System.getProperties().setProperty(Properties.HTTP_PORT.getName(), String.valueOf(this.ports.getHttpPort())); - } - if (this.ports.getGrpcPort() != null) { - System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort())); - } + this.ports.use(); System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); System.getProperties().setProperty( Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); - System.getProperties().setProperty( - Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort()); - System.getProperties().setProperty( - Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort()); } public void switchToGRPC() { @@ -165,15 +156,15 @@ public void switchToProtocol(DaprApiProtocol protocol) { System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name()); } - public int getGrpcPort() { + public Integer getGrpcPort() { return ports.getGrpcPort(); } - public int getHttpPort() { + public Integer getHttpPort() { return ports.getHttpPort(); } - public int getAppPort() { + public Integer getAppPort() { return ports.getAppPort(); } diff --git a/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java new file mode 100644 index 000000000..3a0d2f9dd --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java @@ -0,0 +1,90 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import eu.rekawek.toxiproxy.model.ToxicDirection; + +import java.io.IOException; +import java.time.Duration; + + +public class ToxiProxyRun implements Stoppable { + + private final DaprRun daprRun; + + private final Duration latency; + + private final Duration jitter; + + private Command toxiProxyServer; + + private ToxiproxyClient toxiproxyClient; + + private Proxy grpcProxy; + + private Proxy httpProxy; + + private DaprPorts toxiProxyPorts; + + public ToxiProxyRun(DaprRun run, Duration latency, Duration jitter) { + this.daprRun = run; + this.latency = latency; + this.jitter = jitter; + this.toxiProxyPorts = DaprPorts.build(true, true, true); + // artursouza: we use the "appPort" for the ToxiProxy server. + this.toxiProxyServer = new Command( + "Starting HTTP server on endpoint", + "toxiproxy-server --port " + + this.toxiProxyPorts.getAppPort()); + } + + public void start() throws IOException, InterruptedException { + this.toxiProxyServer.run(); + this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort()); + + if (this.daprRun.getGrpcPort() != null) { + this.grpcProxy = toxiproxyClient.createProxy( + "daprd_grpc", + "127.0.0.1:" + this.toxiProxyPorts.getGrpcPort(), + "127.0.0.1:" + this.daprRun.getGrpcPort()); + this.grpcProxy.toxics() + .latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis()) + .setJitter(this.jitter.toMillis()); + } + + if (this.daprRun.getHttpPort() != null) { + this.httpProxy = toxiproxyClient.createProxy( + "daprd_http", + "127.0.0.1:" + this.toxiProxyPorts.getHttpPort(), + "127.0.0.1:" + this.daprRun.getHttpPort()); + this.httpProxy.toxics() + .latency("latency", ToxicDirection.DOWNSTREAM, this.latency.toMillis()) + .setJitter(this.jitter.toMillis()); + } + } + + public void use() { + this.toxiProxyPorts.use(); + } + + @Override + public void stop() throws InterruptedException, IOException { + this.toxiProxyServer.stop(); + this.toxiproxyClient = null; + this.grpcProxy = null; + this.httpProxy = null; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorSdkResiliencytIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorSdkResiliencytIT.java new file mode 100644 index 000000000..0fb906552 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorSdkResiliencytIT.java @@ -0,0 +1,141 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.actors; + +import io.dapr.actors.ActorId; +import io.dapr.actors.client.ActorProxyBuilder; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.resiliency.ResiliencyOptions; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.ToxiProxyRun; +import io.dapr.it.actors.services.springboot.DemoActor; +import io.dapr.it.actors.services.springboot.DemoActorService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test SDK resiliency. + */ +public class ActorSdkResiliencytIT extends BaseIT { + + private static final ActorId ACTOR_ID = new ActorId(UUID.randomUUID().toString()); + + private static final int NUM_ITERATIONS = 20; + + private static final Duration TIMEOUT = Duration.ofMillis(1000); + + private static final Duration LATENCY = TIMEOUT.dividedBy(2); + + private static final Duration JITTER = TIMEOUT.multipliedBy(2); + + private static final int MAX_RETRIES = -1; // Infinity + + private static DaprRun daprRun; + + private static DaprClient daprClient; + + private static DemoActor demoActor; + + private static ToxiProxyRun toxiProxyRun; + + private static DemoActor toxiDemoActor; + + private static DemoActor resilientDemoActor; + + private static DemoActor oneRetryDemoActor; + + @BeforeClass + public static void init() throws Exception { + daprRun = startDaprApp( + ActorSdkResiliencytIT.class.getSimpleName(), + DemoActorService.SUCCESS_MESSAGE, + DemoActorService.class, + true, + 60000); + + ActorId actorId = new ActorId(UUID.randomUUID().toString()); + + // HTTP client is deprecated, so SDK resiliency is for gRPC client only. + daprRun.switchToGRPC(); + demoActor = buildDemoActorProxy(null); + daprClient = new DaprClientBuilder().build(); + + toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); + toxiProxyRun.start(); + toxiProxyRun.use(); + toxiDemoActor = buildDemoActorProxy(new ResiliencyOptions().setTimeout(TIMEOUT)); + resilientDemoActor = buildDemoActorProxy( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES)); + oneRetryDemoActor = buildDemoActorProxy( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1)); + } + + private static DemoActor buildDemoActorProxy(ResiliencyOptions resiliencyOptions) { + ActorProxyBuilder builder = + new ActorProxyBuilder(DemoActor.class, newActorClient(resiliencyOptions)); + return builder.build(ACTOR_ID); + } + + @AfterClass + public static void tearDown() throws Exception { + if (toxiProxyRun != null) { + toxiProxyRun.stop(); + } + } + + @Test + @Ignore("Flaky when running on GitHub actions") + public void retryAndTimeout() { + AtomicInteger toxiClientErrorCount = new AtomicInteger(); + AtomicInteger retryOneClientErrorCount = new AtomicInteger(); + String message = "hello world"; + for (int i = 0; i < NUM_ITERATIONS; i++) { + try { + toxiDemoActor.writeMessage(message); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + toxiClientErrorCount.incrementAndGet(); + } + try { + oneRetryDemoActor.writeMessage(message); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + retryOneClientErrorCount.incrementAndGet(); + } + + // We retry forever so that the call below should always work. + resilientDemoActor.writeMessage(message); + // Makes sure the value was actually saved. + String savedValue = demoActor.readMessage(); + assertEquals(message, savedValue); + } + + // This assertion makes sure that toxicity is on + assertTrue(toxiClientErrorCount.get() > 0); + assertTrue(retryOneClientErrorCount.get() > 0); + // A client without retries should have more errors than a client with one retry. + assertTrue(toxiClientErrorCount.get() > retryOneClientErrorCount.get()); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java new file mode 100644 index 000000000..47651c942 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java @@ -0,0 +1,146 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.resiliency; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprClientGrpc; +import io.dapr.client.resiliency.ResiliencyOptions; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.ToxiProxyRun; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test SDK resiliency. + */ +public class SdkResiliencytIT extends BaseIT { + + private static final int NUM_ITERATIONS = 20; + + private static final Duration TIMEOUT = Duration.ofMillis(100); + + private static final Duration LATENCY = TIMEOUT.dividedBy(2); + + private static final Duration JITTER = TIMEOUT.multipliedBy(2); + + private static final int MAX_RETRIES = -1; // Infinity + + private static DaprRun daprRun; + + private static DaprClient daprClient; + + private static ToxiProxyRun toxiProxyRun; + + private static DaprClient daprToxiClient; + + private static DaprClient daprResilientClient; + + private static DaprClient daprRetriesOnceClient; + + private final String randomStateKeyPrefix = UUID.randomUUID().toString(); + + @BeforeClass + public static void init() throws Exception { + daprRun = startDaprApp(SdkResiliencytIT.class.getSimpleName(), 5000); + // HTTP client is deprecated, so SDK resiliency is for gRPC client only. + daprRun.switchToGRPC(); + daprClient = new DaprClientBuilder().build(); + + toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); + toxiProxyRun.start(); + toxiProxyRun.use(); + daprToxiClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT)) + .build(); + daprResilientClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES)) + .build(); + daprRetriesOnceClient = new DaprClientBuilder() + .withResiliencyOptions( + new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1)) + .build(); + + assertTrue(daprClient instanceof DaprClientGrpc); + assertTrue(daprToxiClient instanceof DaprClientGrpc); + assertTrue(daprResilientClient instanceof DaprClientGrpc); + assertTrue(daprRetriesOnceClient instanceof DaprClientGrpc); + } + + @AfterClass + public static void tearDown() throws Exception { + if (daprClient != null) { + daprClient.close(); + } + if (daprToxiClient != null) { + daprToxiClient.close(); + } + if (daprResilientClient != null) { + daprResilientClient.close(); + } + if (daprRetriesOnceClient != null) { + daprRetriesOnceClient.close(); + } + if (toxiProxyRun != null) { + toxiProxyRun.stop(); + } + } + + @Test + public void retryAndTimeout() { + AtomicInteger toxiClientErrorCount = new AtomicInteger(); + AtomicInteger retryOneClientErrorCount = new AtomicInteger(); + for (int i = 0; i < NUM_ITERATIONS; i++) { + String key = randomStateKeyPrefix + "_" + i; + String value = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8)); + try { + daprToxiClient.saveState(STATE_STORE_NAME, key, value).block(); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + toxiClientErrorCount.incrementAndGet(); + } + try { + daprRetriesOnceClient.saveState(STATE_STORE_NAME, key, value).block(); + } catch (Exception e) { + // This call should fail sometimes. So, we count. + retryOneClientErrorCount.incrementAndGet(); + } + + // We retry forever so that the call below should always work. + daprResilientClient.saveState(STATE_STORE_NAME, key, value).block(); + // Makes sure the value was actually saved. + String savedValue = daprClient.getState(STATE_STORE_NAME, key, String.class).block().getValue(); + assertEquals(value, savedValue); + } + + // This assertion makes sure that toxicity is on + assertTrue(toxiClientErrorCount.get() > 0); + assertTrue(retryOneClientErrorCount.get() > 0); + // A client without retries should have more errors than a client with one retry. + assertTrue(toxiClientErrorCount.get() > retryOneClientErrorCount.get()); + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 9b73e5b90..ab66a2bf8 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -13,6 +13,7 @@ package io.dapr.client; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; @@ -55,6 +56,11 @@ public class DaprClientBuilder { */ private DaprObjectSerializer stateSerializer; + /** + * Resiliency configuration for DaprClient. + */ + private ResiliencyOptions resiliencyOptions; + /** * Creates a constructor for DaprClient. * @@ -105,6 +111,17 @@ public DaprClientBuilder withStateSerializer(DaprObjectSerializer stateSerialize return this; } + /** + * Sets the resiliency options for DaprClient. + * + * @param options Serializer for objects to be persisted. + * @return This instance. + */ + public DaprClientBuilder withResiliencyOptions(ResiliencyOptions options) { + this.resiliencyOptions = options; + return this; + } + /** * Build an instance of the Client based on the provided setup. * @@ -162,7 +179,12 @@ private DaprClient buildDaprClientGrpc() { final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); - return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer); + return new DaprClientGrpc( + channelFacade, + asyncStub, + this.objectSerializer, + this.stateSerializer, + this.resiliencyOptions); } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 49cf4aca9..9e0d9e575 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -44,9 +44,12 @@ import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.client.domain.UnsubscribeConfigurationRequest; import io.dapr.client.domain.UnsubscribeConfigurationResponse; +import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; +import io.dapr.internal.resiliency.RetryPolicy; +import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.DefaultContentTypeConverter; @@ -92,18 +95,28 @@ public class DaprClientGrpc extends AbstractDaprClient { */ private final GrpcChannelFacade channel; + /** + * The timeout policy. + */ + private final TimeoutPolicy timeoutPolicy; + + /** + * The retry policy. + */ + private final RetryPolicy retryPolicy; + /** * The async gRPC stub. */ - private DaprGrpc.DaprStub asyncStub; + private final DaprGrpc.DaprStub asyncStub; /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * - * @param channel Facade for the managed GRPC channel - * @param asyncStub async gRPC stub - * @param objectSerializer Serializer for transient request/response objects. - * @param stateSerializer Serializer for state objects. + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. * @see DaprClientBuilder */ DaprClientGrpc( @@ -111,9 +124,32 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprGrpc.DaprStub asyncStub, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { + this(channel, asyncStub, objectSerializer, stateSerializer, null); + } + + /** + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param resiliencyOptions Client-level override for resiliency options. + * @see DaprClientBuilder + */ + DaprClientGrpc( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + ResiliencyOptions resiliencyOptions) { super(objectSerializer, stateSerializer); this.channel = channel; this.asyncStub = intercept(asyncStub); + this.timeoutPolicy = new TimeoutPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + this.retryPolicy = new RetryPolicy( + resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -994,14 +1030,14 @@ private ConfigurationItem buildConfigurationItem( * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { + private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, - CallOptions callOptions, + CallOptions options, Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { @@ -1009,7 +1045,6 @@ public void start(final Listener responseListener, final Metadata metadat if (daprApiToken != null) { metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); } - super.start(responseListener, metadata); } }; @@ -1030,11 +1065,13 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu } private Mono createMono(Consumer> consumer) { - return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private Flux createFlux(Consumer> consumer) { - return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); + return retryPolicy.apply( + Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); } private StreamObserver createStreamObserver(MonoSink sink) { diff --git a/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java b/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java new file mode 100644 index 000000000..5c60ceccc --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/resiliency/ResiliencyOptions.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.resiliency; + +import java.time.Duration; + +/** + * Resiliency policy for SDK communication to Dapr API. + */ +public final class ResiliencyOptions { + + private Duration timeout; + + private Integer maxRetries; + + public Duration getTimeout() { + return timeout; + } + + public ResiliencyOptions setTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public Integer getMaxRetries() { + return maxRetries; + } + + public ResiliencyOptions setMaxRetries(Integer maxRetries) { + this.maxRetries = maxRetries; + return this; + } +} diff --git a/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java b/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java new file mode 100644 index 000000000..88ed12c9e --- /dev/null +++ b/sdk/src/main/java/io/dapr/config/MillisecondsDurationProperty.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.config; + +import java.time.Duration; + +/** + * Integer configuration property. + */ +public class MillisecondsDurationProperty extends Property { + + /** + * {@inheritDoc} + */ + MillisecondsDurationProperty(String name, String envName, Duration defaultValue) { + super(name, envName, defaultValue); + } + + /** + * {@inheritDoc} + */ + @Override + protected Duration parse(String value) { + long longValue = Long.parseLong(value); + return Duration.ofMillis(longValue); + } + +} diff --git a/sdk/src/main/java/io/dapr/config/Properties.java b/sdk/src/main/java/io/dapr/config/Properties.java index 08bd911d6..f4c4ba603 100644 --- a/sdk/src/main/java/io/dapr/config/Properties.java +++ b/sdk/src/main/java/io/dapr/config/Properties.java @@ -17,6 +17,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; /** * Global properties for Dapr's SDK, using Supplier so they are dynamically resolved. @@ -38,6 +39,16 @@ public class Properties { */ private static final Integer DEFAULT_GRPC_PORT = 50001; + /** + * Dapr's default max retries. + */ + private static final Integer DEFAULT_API_MAX_RETRIES = 0; + + /** + * Dapr's default timeout in seconds. + */ + private static final Duration DEFAULT_API_TIMEOUT = Duration.ofMillis(0L); + /** * Dapr's default use of gRPC or HTTP. */ @@ -115,6 +126,22 @@ public class Properties { "DAPR_HTTP_ENDPOINT", null); + /** + * Maximum number of retries for retriable exceptions. + */ + public static final Property MAX_RETRIES = new IntegerProperty( + "dapr.api.maxRetries", + "DAPR_API_MAX_RETRIES", + DEFAULT_API_MAX_RETRIES); + + /** + * Timeout for API calls. + */ + public static final Property TIMEOUT = new MillisecondsDurationProperty( + "dapr.api.timeoutMilliseconds", + "DAPR_API_TIMEOUT_MILLISECONDS", + DEFAULT_API_TIMEOUT); + /** * Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car. * @deprecated This attribute will be deleted at SDK version 1.10. diff --git a/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java b/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java new file mode 100644 index 000000000..bf0bfdb24 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/resiliency/RetryPolicy.java @@ -0,0 +1,127 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.resiliency; + +import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; + +/** + * Retry policy for SDK communication to Dapr API. + */ +public final class RetryPolicy { + + private static final int MIN_BACKOFF_MILLIS = 500; + + private static final int MAX_BACKOFF_SECONDS = 5; + + private final Retry retrySpec; + + public RetryPolicy() { + this(null); + } + + public RetryPolicy(Integer maxRetries) { + this.retrySpec = buildRetrySpec(maxRetries != null ? maxRetries : Properties.MAX_RETRIES.get()); + } + + /** + * Applies the retry policy to an expected Mono action. + * @param response Response + * @param Type expected for the action's response + * @return action with retry + */ + public Mono apply(Mono response) { + if (this.retrySpec == null) { + return response; + } + + return response.retryWhen(retrySpec) + .onErrorMap(throwable -> findDaprException(throwable)); + } + + /** + * Applies the retry policy to an expected Flux action. + * @param response Response + * @param Type expected for the action's response + * @return action with retry + */ + public Flux apply(Flux response) { + if (this.retrySpec == null) { + return response; + } + + return response.retryWhen(retrySpec) + .onErrorMap(throwable -> findDaprException(throwable)); + } + + private static Retry buildRetrySpec(int maxRetries) { + if (maxRetries == 0) { + return null; + } + + if (maxRetries < 0) { + return Retry.indefinitely() + .filter(throwable -> isRetryableGrpcError(throwable)); + } + + return Retry.backoff(maxRetries, Duration.ofMillis(MIN_BACKOFF_MILLIS)) + .maxBackoff(Duration.ofSeconds(MAX_BACKOFF_SECONDS)) + .filter(throwable -> isRetryableGrpcError(throwable)); + } + + private static boolean isRetryableGrpcError(Throwable throwable) { + Status grpcStatus = findGrpcStatusCode(throwable); + if (grpcStatus == null) { + return false; + } + + switch (grpcStatus.getCode()) { + case DEADLINE_EXCEEDED: + case UNAVAILABLE: + return true; + default: + return false; + } + } + + private static Status findGrpcStatusCode(Throwable throwable) { + while (throwable != null) { + if (throwable instanceof StatusRuntimeException) { + return ((StatusRuntimeException) throwable).getStatus(); + } + + throwable = throwable.getCause(); + } + return null; + } + + private static Throwable findDaprException(Throwable throwable) { + Throwable original = throwable; + while (throwable != null) { + if (throwable instanceof DaprException) { + return throwable; + } + + throwable = throwable.getCause(); + } + return original; + } +} diff --git a/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java b/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java new file mode 100644 index 000000000..38329d67c --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/resiliency/TimeoutPolicy.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.resiliency; + +import io.dapr.config.Properties; +import io.grpc.CallOptions; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Timeout policy for SDK communication to Dapr API. + */ +public final class TimeoutPolicy { + + private final Duration timeout; + + /** + * Instantiates a new timeout policy with override value. + * @param timeout Override timeout value. + */ + public TimeoutPolicy(Duration timeout) { + this.timeout = timeout != null ? timeout : Properties.TIMEOUT.get(); + } + + /** + * Instantiates a new timeout policy with default value. + */ + public TimeoutPolicy() { + this(null); + } + + /** + * Applies the timeout policy to a gRPC call options. + * @param options Call options + * @return Call options with retry policy applied + */ + public CallOptions apply(CallOptions options) { + if (this.timeout.isZero() || this.timeout.isNegative()) { + return options; + } + + return options.withDeadlineAfter(this.timeout.toMillis(), TimeUnit.MILLISECONDS); + } +} diff --git a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java index 8f755f313..71e1be12c 100644 --- a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java +++ b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java @@ -27,6 +27,8 @@ */ public final class NetworkUtils { + private static final long RETRY_WAIT_MILLISECONDS = 1000; + private NetworkUtils() { } @@ -39,7 +41,7 @@ private NetworkUtils() { */ public static void waitForSocket(String host, int port, int timeoutInMilliseconds) throws InterruptedException { long started = System.currentTimeMillis(); - Retry.callWithRetry(() -> { + callWithRetry(() -> { try { try (Socket socket = new Socket()) { // timeout cannot be negative. @@ -78,4 +80,31 @@ public static ManagedChannel buildGrpcManagedChannel() { } return builder.build(); } + + private static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException { + long started = System.currentTimeMillis(); + while (true) { + Throwable exception; + try { + function.run(); + return; + } catch (Exception e) { + exception = e; + } catch (AssertionError e) { + exception = e; + } + + long elapsed = System.currentTimeMillis() - started; + if (elapsed >= retryTimeoutMilliseconds) { + if (exception instanceof RuntimeException) { + throw (RuntimeException)exception; + } + + throw new RuntimeException(exception); + } + + long remaining = retryTimeoutMilliseconds - elapsed; + Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS)); + } + } } diff --git a/sdk/src/main/java/io/dapr/utils/Retry.java b/sdk/src/main/java/io/dapr/utils/Retry.java deleted file mode 100644 index 0d181ff03..000000000 --- a/sdk/src/main/java/io/dapr/utils/Retry.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.utils; - -class Retry { - - private static final long RETRY_WAIT_MILLISECONDS = 1000; - - private Retry() { - } - - static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException { - long started = System.currentTimeMillis(); - while (true) { - Throwable exception; - try { - function.run(); - return; - } catch (Exception e) { - exception = e; - } catch (AssertionError e) { - exception = e; - } - - long elapsed = System.currentTimeMillis() - started; - if (elapsed >= retryTimeoutMilliseconds) { - if (exception instanceof RuntimeException) { - throw (RuntimeException)exception; - } - - throw new RuntimeException(exception); - } - - long remaining = retryTimeoutMilliseconds - elapsed; - Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS)); - } - } -} diff --git a/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java b/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java new file mode 100644 index 000000000..d495832c2 --- /dev/null +++ b/sdk/src/test/java/io/dapr/resiliency/RetryPolicyTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.resiliency; + +import io.dapr.internal.resiliency.RetryPolicy; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.junit.Test; +import reactor.core.Exceptions; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class RetryPolicyTest { + + private static final String SUCCESS_MESSAGE = "It worked!"; + + private static final RuntimeException RETRYABLE_EXCEPTION = + new StatusRuntimeException(Status.DEADLINE_EXCEEDED); + + @Test + public void zeroRetriesThenError() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(0); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION); + + try { + policy.apply(action).block(); + fail("Exception expected"); + } catch (Exception e) { + assertSame(RETRYABLE_EXCEPTION, e); + } + assertEquals(1, callCounter.get()); + } + + @Test + public void zeroRetriesThenSuccess() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(0); + Mono action = createActionErrorAndReturn(callCounter, 0, RETRYABLE_EXCEPTION); + + String response = policy.apply(action).block(); + assertEquals(SUCCESS_MESSAGE, response); + assertEquals(1, callCounter.get()); + } + + @Test + public void twoRetriesThenSuccess() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, 2, RETRYABLE_EXCEPTION); + + String response = policy.apply(action).block(); + assertEquals(SUCCESS_MESSAGE, response); + assertEquals(3, callCounter.get()); + } + + @Test + public void threeRetriesThenError() { + AtomicInteger callCounter = new AtomicInteger(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, RETRYABLE_EXCEPTION); + + try { + policy.apply(action).block(); + fail("Exception expected"); + } catch (Exception e) { + assertTrue(Exceptions.isRetryExhausted(e)); + } + assertEquals(4, callCounter.get()); + } + + @Test + public void notRetryableException() { + AtomicInteger callCounter = new AtomicInteger(); + RuntimeException exception = new ArithmeticException(); + RetryPolicy policy = new RetryPolicy(3); + Mono action = createActionErrorAndReturn(callCounter, Integer.MAX_VALUE, exception); + + assertThrows(ArithmeticException.class, () -> { + policy.apply(action).block(); + }); + assertEquals(1, callCounter.get()); + } + + + + private static Mono createActionErrorAndReturn( + AtomicInteger callCounter, + int firstErrors, + RuntimeException error) { + return Mono.fromCallable(() -> { + if (callCounter.incrementAndGet() <= firstErrors) { + throw error; + } + + return SUCCESS_MESSAGE; + }); + } +}