From 6daa0f6ef1ceaeed8862e566aaf86e93f6b50e40 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 22 Nov 2024 16:59:51 -0800 Subject: [PATCH] Implement actor client metadata. Signed-off-by: Artur Souza --- .../io/dapr/actors/client/ActorClient.java | 28 ++++++++++++-- .../io/dapr/actors/client/DaprClientImpl.java | 15 +++++++- .../actors/client/DaprGrpcClientTest.java | 2 +- .../src/test/java/io/dapr/it/DaprRun.java | 12 +++++- .../io/dapr/it/actors/ActorExceptionIT.java | 38 +++++++++++++++---- .../grpc/DaprClientGrpcInterceptors.java | 12 +++++- .../interceptors/DaprApiTokenInterceptor.java | 1 - ...a => DaprMetadataReceiverInterceptor.java} | 4 +- 8 files changed, 92 insertions(+), 20 deletions(-) rename sdk/src/main/java/io/dapr/internal/grpc/interceptors/{DaprMetadataInterceptor.java => DaprMetadataReceiverInterceptor.java} (93%) diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index 877b27574..08c0a1c9e 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -22,6 +22,9 @@ import io.grpc.ManagedChannelBuilder; import reactor.core.publisher.Mono; +import java.util.Collections; +import java.util.Map; + /** * Holds a client for Dapr sidecar communication. ActorClient should be reused. */ @@ -59,7 +62,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) { * @param overrideProperties Override properties. */ public ActorClient(Properties overrideProperties) { - this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN)); + this(overrideProperties, null); } /** @@ -69,21 +72,38 @@ public ActorClient(Properties overrideProperties) { * @param resiliencyOptions Client resiliency options. */ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) { - this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN)); + this(overrideProperties, null, resiliencyOptions); + } + + /** + * Instantiates a new channel for Dapr sidecar communication. + * + * @param overrideProperties Override properties. + * @param metadata gRPC metadata or HTTP headers for actor invocation. + * @param resiliencyOptions Client resiliency options. + */ + public ActorClient(Properties overrideProperties, Map metadata, ResiliencyOptions resiliencyOptions) { + this(buildManagedChannel(overrideProperties), + metadata, + resiliencyOptions, + overrideProperties.getValue(Properties.API_TOKEN)); } /** * Instantiates a new channel for Dapr sidecar communication. * * @param grpcManagedChannel gRPC channel. + * @param metadata gRPC metadata or HTTP headers for actor invocation. * @param resiliencyOptions Client resiliency options. + * @param daprApiToken Dapr API token. */ private ActorClient( ManagedChannel grpcManagedChannel, + Map metadata, ResiliencyOptions resiliencyOptions, String daprApiToken) { this.grpcManagedChannel = grpcManagedChannel; - this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken); + this.daprClient = buildDaprClient(grpcManagedChannel, metadata, resiliencyOptions, daprApiToken); } /** @@ -137,10 +157,12 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties) */ private static DaprClient buildDaprClient( Channel grpcManagedChannel, + Map metadata, ResiliencyOptions resiliencyOptions, String daprApiToken) { return new DaprClientImpl( DaprGrpc.newStub(grpcManagedChannel), + metadata == null ? null : Collections.unmodifiableMap(metadata), resiliencyOptions, daprApiToken); } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java index b415e812e..30ea4616c 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java @@ -34,6 +34,7 @@ import reactor.core.publisher.MonoSink; import reactor.util.context.ContextView; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -57,19 +58,30 @@ class DaprClientImpl implements DaprClient { */ private final DaprClientGrpcInterceptors grpcInterceptors; + /** + * Metadata for actor invocation requests. + */ + private final Map metadata; + /** * Internal constructor. * * @param grpcClient Dapr's GRPC client. + * @param metadata gRPC metadata or HTTP headers for actor server to receive. * @param resiliencyOptions Client resiliency options (optional). * @param daprApiToken Dapr API token (optional). */ - DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) { + DaprClientImpl( + DaprGrpc.DaprStub grpcClient, + Map metadata, + ResiliencyOptions resiliencyOptions, + String daprApiToken) { this.client = grpcClient; this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout())); this.retryPolicy = new RetryPolicy( resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); + this.metadata = metadata == null ? Map.of() : metadata; } /** @@ -82,6 +94,7 @@ public Mono invoke(String actorType, String actorId, String methodName, .setActorType(actorType) .setActorId(actorId) .setMethod(methodName) + .putAllMetadata(this.metadata) .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload)) .build(); return Mono.deferContextual( diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index 0876a73cc..1f05e7ac6 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -106,7 +106,7 @@ public void setup() throws IOException { InProcessChannelBuilder.forName(serverName).directExecutor().build()); // Create a HelloWorldClient using the in-process channel; - client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null); + client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null, null); } @Test diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 99c38bfff..c133f1fc2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -187,11 +187,19 @@ public DaprClientBuilder newDaprClientBuilder() { } public ActorClient newActorClient() { - return this.newActorClient(null); + return this.newActorClient(null, null); + } + + public ActorClient newActorClient(Map metadata) { + return this.newActorClient(metadata, null); } public ActorClient newActorClient(ResiliencyOptions resiliencyOptions) { - return new ActorClient(new Properties(this.getPropertyOverrides()), resiliencyOptions); + return this.newActorClient(null, resiliencyOptions); + } + + public ActorClient newActorClient(Map metadata, ResiliencyOptions resiliencyOptions) { + return new ActorClient(new Properties(this.getPropertyOverrides()), metadata, resiliencyOptions); } public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException { diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java index 3b97084cb..64d0f3ae8 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java @@ -16,12 +16,17 @@ import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxyBuilder; import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; import io.dapr.it.actors.app.MyActor; import io.dapr.it.actors.app.MyActorService; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring; @@ -30,23 +35,24 @@ public class ActorExceptionIT extends BaseIT { private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class); - @Test - public void exceptionTest() throws Exception { + private static DaprRun run; + + @BeforeAll + public static void start() throws Exception { // The call below will fail if service cannot start successfully. - var run = startDaprApp( + run = startDaprApp( ActorExceptionIT.class.getSimpleName(), MyActorService.SUCCESS_MESSAGE, MyActorService.class, true, 60000); + } - logger.debug("Creating proxy builder"); + @Test + public void exceptionTest() throws Exception { ActorProxyBuilder proxyBuilder = new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient())); - logger.debug("Creating actorId"); - ActorId actorId1 = new ActorId("1"); - logger.debug("Building proxy"); - MyActor proxy = proxyBuilder.build(actorId1); + MyActor proxy = proxyBuilder.build(new ActorId("1")); callWithRetry(() -> { assertThrowsDaprExceptionSubstring( @@ -55,4 +61,20 @@ public void exceptionTest() throws Exception { () -> proxy.throwException()); }, 10000); } + + @Test + public void exceptionDueToMetadataTest() throws Exception { + // Setting this HTTP header via actor metadata will cause the Actor HTTP server to error. + Map metadata = Map.of("Content-Length", "9999"); + ActorProxyBuilder proxyBuilderMetadataOverride = + new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient(metadata))); + + MyActor proxyWithMetadata = proxyBuilderMetadataOverride.build(new ActorId("2")); + callWithRetry(() -> { + assertThrowsDaprExceptionSubstring( + "INTERNAL", + "ContentLength=9999 with Body length 13", + () -> proxyWithMetadata.say("hello world")); + }, 10000); + } } diff --git a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java index a4e5c2324..0a3b2f113 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java @@ -15,7 +15,7 @@ import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor; import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor; -import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor; +import io.dapr.internal.grpc.interceptors.DaprMetadataReceiverInterceptor; import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor; import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor; import io.dapr.internal.resiliency.TimeoutPolicy; @@ -35,10 +35,18 @@ public class DaprClientGrpcInterceptors { private final TimeoutPolicy timeoutPolicy; + /** + * Instantiates a holder of all gRPC interceptors. + */ public DaprClientGrpcInterceptors() { this(null, null); } + /** + * Instantiates a holder of all gRPC interceptors. + * @param daprApiToken Dapr API token. + * @param timeoutPolicy Timeout Policy. + */ public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) { this.daprApiToken = daprApiToken; this.timeoutPolicy = timeoutPolicy; @@ -118,7 +126,7 @@ public > T intercept( new DaprApiTokenInterceptor(this.daprApiToken), new DaprTimeoutInterceptor(this.timeoutPolicy), new DaprTracingInterceptor(context), - new DaprMetadataInterceptor(metadataConsumer)); + new DaprMetadataReceiverInterceptor(metadataConsumer)); } } diff --git a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java index f7cf415d8..3f2aa3cdc 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java @@ -14,7 +14,6 @@ package io.dapr.internal.grpc.interceptors; import io.dapr.client.Headers; -import io.dapr.config.Properties; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; diff --git a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataReceiverInterceptor.java similarity index 93% rename from sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java rename to sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataReceiverInterceptor.java index 44b438a01..9e564536a 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataReceiverInterceptor.java @@ -27,7 +27,7 @@ /** * Consumes gRPC metadata. */ -public class DaprMetadataInterceptor implements ClientInterceptor { +public class DaprMetadataReceiverInterceptor implements ClientInterceptor { private final Consumer metadataConsumer; @@ -35,7 +35,7 @@ public class DaprMetadataInterceptor implements ClientInterceptor { * Creates an instance of the consumer for gRPC metadata. * @param metadataConsumer gRPC metadata consumer */ - public DaprMetadataInterceptor(Consumer metadataConsumer) { + public DaprMetadataReceiverInterceptor(Consumer metadataConsumer) { this.metadataConsumer = metadataConsumer; }