From 8ceec86962ecb14acfaeac9cee82aee5ac05edba Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Thu, 10 Oct 2024 15:36:17 -0700 Subject: [PATCH 1/5] Fix bi-di subscription to support dapr-api-token Signed-off-by: Artur Souza --- .../io/dapr/actors/client/ActorClient.java | 17 ++-- .../io/dapr/actors/client/DaprClientImpl.java | 66 +++------------ .../actors/client/DaprGrpcClientTest.java | 2 +- .../src/test/java/io/dapr/it/DaprRun.java | 33 ++++++-- .../io/dapr/client/DaprClientBuilder.java | 3 +- .../java/io/dapr/client/DaprClientImpl.java | 84 +++++++++++++++---- .../grpc/DaprClientGrpcInterceptors.java | 83 +++++------------- .../interceptors/DaprApiTokenInterceptor.java | 22 ++++- .../client/DaprClientGrpcTelemetryTest.java | 2 +- 9 files changed, 159 insertions(+), 153 deletions(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java index c2d7a4444..877b27574 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java @@ -59,7 +59,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) { * @param overrideProperties Override properties. */ public ActorClient(Properties overrideProperties) { - this(buildManagedChannel(overrideProperties), null); + this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN)); } /** @@ -69,7 +69,7 @@ public ActorClient(Properties overrideProperties) { * @param resiliencyOptions Client resiliency options. */ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) { - this(buildManagedChannel(overrideProperties), resiliencyOptions); + this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN)); } /** @@ -80,9 +80,10 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp */ private ActorClient( ManagedChannel grpcManagedChannel, - ResiliencyOptions resiliencyOptions) { + ResiliencyOptions resiliencyOptions, + String daprApiToken) { this.grpcManagedChannel = grpcManagedChannel; - this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions); + this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken); } /** @@ -136,7 +137,11 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties) */ private static DaprClient buildDaprClient( Channel grpcManagedChannel, - ResiliencyOptions resiliencyOptions) { - return new DaprClientImpl(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions); + ResiliencyOptions resiliencyOptions, + String daprApiToken) { + return new DaprClientImpl( + DaprGrpc.newStub(grpcManagedChannel), + resiliencyOptions, + daprApiToken); } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java index c060e01c5..b415e812e 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java @@ -42,11 +42,6 @@ */ class DaprClientImpl implements DaprClient { - /** - * Timeout policy for SDK calls to Dapr API. - */ - private final TimeoutPolicy timeoutPolicy; - /** * Retry policy for SDK calls to Dapr API. */ @@ -57,16 +52,22 @@ class DaprClientImpl implements DaprClient { */ private final DaprGrpc.DaprStub client; + /** + * gRPC client interceptors. + */ + private final DaprClientGrpcInterceptors grpcInterceptors; + /** * Internal constructor. * * @param grpcClient Dapr's GRPC client. - * @param resiliencyOptions Client resiliency options (optional) + * @param resiliencyOptions Client resiliency options (optional). + * @param daprApiToken Dapr API token (optional). */ - DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) { - this.client = intercept(grpcClient); - this.timeoutPolicy = new TimeoutPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); + DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) { + this.client = grpcClient; + this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, + new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout())); this.retryPolicy = new RetryPolicy( resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); } @@ -85,54 +86,11 @@ public Mono invoke(String actorType, String actorId, String methodName, .build(); return Mono.deferContextual( context -> this.createMono( - it -> intercept(context, this.timeoutPolicy, client).invokeActor(req, it) + it -> this.grpcInterceptors.intercept(client, context).invokeActor(req, it) ) ).map(r -> r.getData().toByteArray()); } - /** - * Populates GRPC client with interceptors. - * - * @param client GRPC client for Dapr. - * @return Client after adding interceptors. - */ - private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { - ClientInterceptor interceptor = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions options, - Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options)); - return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { - @Override - public void start(final Listener responseListener, final Metadata metadata) { - String daprApiToken = Properties.API_TOKEN.get(); - if (daprApiToken != null) { - metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken); - } - - super.start(responseListener, metadata); - } - }; - } - }; - return client.withInterceptors(interceptor); - } - - /** - * Populates GRPC client with interceptors for telemetry. - * - * @param context Reactor's context. - * @param timeoutPolicy Timeout policy for gRPC call. - * @param client GRPC client for Dapr. - * @return Client after adding interceptors. - */ - private static DaprGrpc.DaprStub intercept( - ContextView context, TimeoutPolicy timeoutPolicy, DaprGrpc.DaprStub client) { - return DaprClientGrpcInterceptors.intercept(client, timeoutPolicy, context); - } - private Mono createMono(Consumer> consumer) { return retryPolicy.apply( Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index 313eca5a7..0876a73cc 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -106,7 +106,7 @@ public void setup() throws IOException { InProcessChannelBuilder.forName(serverName).directExecutor().build()); // Create a HelloWorldClient using the in-process channel; - client = new DaprClientImpl(DaprGrpc.newStub(channel), null); + client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null); } @Test diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 8ea32b368..07a2a4b1e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -30,7 +30,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -68,19 +71,34 @@ public class DaprRun implements Stoppable { private final boolean hasAppHealthCheck; + private final Map, String> propertyOverrides; + private DaprRun(String testName, DaprPorts ports, String successMessage, Class serviceClass, int maxWaitMilliseconds, AppRun.AppProtocol appProtocol) { + this(testName, ports, successMessage, serviceClass, maxWaitMilliseconds, appProtocol, UUID.randomUUID().toString()); + } + + private DaprRun(String testName, + DaprPorts ports, + String successMessage, + Class serviceClass, + int maxWaitMilliseconds, + AppRun.AppProtocol appProtocol, + String daprApiToken) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName.toLowerCase() : String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase(); this.appProtocol = appProtocol; this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol)); + new Command( + successMessage, + buildDaprCommand(this.appName, serviceClass, ports, appProtocol), + Map.of("DAPR_API_TOKEN", daprApiToken)); this.listCommand = new Command( this.appName, "dapr list"); @@ -91,6 +109,9 @@ private DaprRun(String testName, this.maxWaitMilliseconds = maxWaitMilliseconds; this.started = new AtomicBoolean(false); this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass); + this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{ + put(Properties.API_TOKEN, daprApiToken); + }}); } public void start() throws InterruptedException, IOException { @@ -149,7 +170,7 @@ public void stop() throws InterruptedException, IOException { } public Map, String> getPropertyOverrides() { - return this.ports.getPropertyOverrides(); + return this.propertyOverrides; } public DaprClientBuilder newDaprClientBuilder() { @@ -239,17 +260,13 @@ public String getAppName() { public DaprClient newDaprClient() { return new DaprClientBuilder() - .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) - .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) - .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .withPropertyOverrides(this.getPropertyOverrides()) .build(); } public DaprPreviewClient newDaprPreviewClient() { return new DaprClientBuilder() - .withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) - .withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString()) - .withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1") + .withPropertyOverrides(this.getPropertyOverrides()) .buildPreviewClient(); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index ad67eed8c..964b28e1d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -173,6 +173,7 @@ private DaprClientImpl buildDaprClient() { daprHttp, this.objectSerializer, this.stateSerializer, - this.resiliencyOptions); + this.resiliencyOptions, + properties.getValue(Properties.API_TOKEN)); } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 9eccc633b..803488c0a 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -119,11 +119,6 @@ public class DaprClientImpl extends AbstractDaprClient { */ private final GrpcChannelFacade channel; - /** - * The timeout policy. - */ - private final TimeoutPolicy timeoutPolicy; - /** * The retry policy. */ @@ -141,9 +136,10 @@ public class DaprClientImpl extends AbstractDaprClient { */ private final DaprHttp httpClient; + private final DaprClientGrpcInterceptors grpcInterceptors; + /** - * Default access level constructor, in order to create an instance of this - * class use io.dapr.client.DaprClientBuilder + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * * @param channel Facade for the managed GRPC channel * @param asyncStub async gRPC stub @@ -157,7 +153,27 @@ public class DaprClientImpl extends AbstractDaprClient { DaprHttp httpClient, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { - this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null); + this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, null); + } + + /** + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param daprApiToken Dapr API Token. + * @see DaprClientBuilder + */ + DaprClientImpl( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprHttp httpClient, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + String daprApiToken) { + this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, daprApiToken); } /** @@ -169,6 +185,7 @@ public class DaprClientImpl extends AbstractDaprClient { * @param objectSerializer Serializer for transient request/response objects. * @param stateSerializer Serializer for state objects. * @param resiliencyOptions Client-level override for resiliency options. + * @param daprApiToken Dapr API Token. * @see DaprClientBuilder */ DaprClientImpl( @@ -177,15 +194,47 @@ public class DaprClientImpl extends AbstractDaprClient { DaprHttp httpClient, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer, - ResiliencyOptions resiliencyOptions) { + ResiliencyOptions resiliencyOptions, + String daprApiToken) { + this( + channel, + asyncStub, + httpClient, + objectSerializer, + stateSerializer, + new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()), + new RetryPolicy(resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()), + daprApiToken); + } + + /** + * Instantiates a new DaprClient. + * + * @param channel Facade for the managed GRPC channel + * @param asyncStub async gRPC stub + * @param httpClient client for http service invocation + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @param timeoutPolicy Client-level timeout policy. + * @param retryPolicy Client-level retry policy. + * @param daprApiToken Dapr API Token. + * @see DaprClientBuilder + */ + private DaprClientImpl( + GrpcChannelFacade channel, + DaprGrpc.DaprStub asyncStub, + DaprHttp httpClient, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer, + TimeoutPolicy timeoutPolicy, + RetryPolicy retryPolicy, + String daprApiToken) { super(objectSerializer, stateSerializer); this.channel = channel; this.asyncStub = asyncStub; this.httpClient = httpClient; - this.timeoutPolicy = new TimeoutPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); - this.retryPolicy = new RetryPolicy( - resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); + this.retryPolicy = retryPolicy; + this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -215,7 +264,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State */ public > T newGrpcStub(String appId, Function stubBuilder) { // Adds Dapr interceptors to populate gRPC metadata automatically. - return DaprClientGrpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()), timeoutPolicy); + return this.grpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel())); } /** @@ -425,7 +474,8 @@ private Subscription buildSubscription( SubscriptionListener listener, TypeRef type, DaprProtos.SubscribeTopicEventsRequestAlpha1 request) { - Subscription subscription = new Subscription<>(this.asyncStub, request, listener, response -> { + var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); + Subscription subscription = new Subscription<>(interceptedStub, request, listener, response -> { if (response.getEventMessage() == null) { return null; } @@ -1268,7 +1318,7 @@ private ConfigurationItem buildConfigurationItem( * @return Client after adding interceptors. */ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { - return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context); + return this.grpcInterceptors.intercept(client, context); } /** @@ -1281,7 +1331,7 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien */ private DaprGrpc.DaprStub intercept( ContextView context, DaprGrpc.DaprStub client, Consumer metadataConsumer) { - return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer); + return this.grpcInterceptors.intercept(client, context, metadataConsumer); } private Mono createMono(Consumer> consumer) { diff --git a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java index 5241e1d5f..a4e5c2324 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java @@ -31,121 +31,82 @@ */ public class DaprClientGrpcInterceptors { - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked - * @param client gRPC client - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final String appId, final T client) { - return intercept(appId, client, null, null, null); - } + private final String daprApiToken; - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param client gRPC client - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final T client) { - return intercept(null, client, null, null, null); - } + private final TimeoutPolicy timeoutPolicy; - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked - * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept( - final String appId, final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(appId, client, timeoutPolicy, null, null); + public DaprClientGrpcInterceptors() { + this(null, null); } - /** - * Adds all Dapr interceptors to a gRPC async stub. - * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call - * @param async client type - * @return async client instance with interceptors - */ - public static > T intercept(final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(null, client, timeoutPolicy, null, null); + public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) { + this.daprApiToken = daprApiToken; + this.timeoutPolicy = timeoutPolicy; } /** * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked * @param client gRPC client - * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept( - final String appId, final T client, final ContextView context) { - return intercept(appId, client, null, context, null); + public > T intercept(final T client) { + return intercept(null, client, null, null); } /** * Adds all Dapr interceptors to a gRPC async stub. + * @param appId Application ID to invoke. * @param client gRPC client - * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept(final T client, final ContextView context) { - return intercept(null, client, null, context, null); + public > T intercept( + final String appId, + final T client) { + return this.intercept(appId, client, null, null); } /** * Adds all Dapr interceptors to a gRPC async stub. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context) { - return intercept(null, client, timeoutPolicy, context, null); + return intercept(null, client, context, null); } /** * Adds all Dapr interceptors to a gRPC async stub. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param metadataConsumer Consumer of the gRPC metadata * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context, final Consumer metadataConsumer) { - return intercept(null, client, timeoutPolicy, context, metadataConsumer); + return this.intercept(null, client, context, metadataConsumer); } /** * Adds all Dapr interceptors to a gRPC async stub. - * @param appId the appId to be invoked + * @param appId Application ID to invoke. * @param client gRPC client - * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing * @param metadataConsumer Consumer of the gRPC metadata * @param async client type * @return async client instance with interceptors */ - public static > T intercept( + public > T intercept( final String appId, final T client, - final TimeoutPolicy timeoutPolicy, final ContextView context, final Consumer metadataConsumer) { if (client == null) { @@ -154,8 +115,8 @@ public static > T intercept( return client.withInterceptors( new DaprAppIdInterceptor(appId), - new DaprApiTokenInterceptor(), - new DaprTimeoutInterceptor(timeoutPolicy), + new DaprApiTokenInterceptor(this.daprApiToken), + new DaprTimeoutInterceptor(this.timeoutPolicy), new DaprTracingInterceptor(context), new DaprMetadataInterceptor(metadataConsumer)); } diff --git a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java index cda6e896b..f7cf415d8 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java @@ -24,10 +24,23 @@ import io.grpc.MethodDescriptor; /** - * Class to be used as part of your service's client stub interceptor to include Dapr tokens. + * Class to be used as part of your service's client stub interceptor to include the Dapr API token. */ public class DaprApiTokenInterceptor implements ClientInterceptor { + /** + * Dapr API Token. + */ + private final String token; + + /** + * Instantiates an interceptor to inject the Dapr API Token. + * @param token Dapr API Token. + */ + public DaprApiTokenInterceptor(String token) { + this.token = token; + } + @Override public ClientCall interceptCall( MethodDescriptor methodDescriptor, @@ -37,9 +50,10 @@ public ClientCall interceptCall( return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) { @Override public void start(final Listener responseListener, final Metadata metadata) { - String daprApiToken = Properties.API_TOKEN.get(); - if (daprApiToken != null) { - metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); + if (DaprApiTokenInterceptor.this.token != null) { + metadata.put( + Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), + DaprApiTokenInterceptor.this.token); } super.start(responseListener, metadata); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index 4b8b80317..dc7ed58ed 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -238,7 +238,7 @@ private Mono invoke() { .build(); return Mono.deferContextual( context -> this.createMono( - it -> DaprClientGrpcInterceptors.intercept(daprStub, context).invokeService(req, it) + it -> new DaprClientGrpcInterceptors().intercept(daprStub, context).invokeService(req, it) ) ).then(); } From b902e24a25661fd3aec9988c9386523eb85849f9 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Thu, 10 Oct 2024 17:02:42 -0700 Subject: [PATCH 2/5] Remove dapr-api-token from actor services Signed-off-by: Artur Souza --- .../src/test/java/io/dapr/it/DaprRun.java | 38 ++++++++++++++++--- .../test/java/io/dapr/it/DaprRunConfig.java | 2 + .../io/dapr/it/actors/app/MyActorService.java | 3 ++ 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 07a2a4b1e..654bab46f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -43,6 +43,7 @@ public class DaprRun implements Stoppable { + private static final String DEFAULT_DAPR_API_TOKEN = UUID.randomUUID().toString(); private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " + @@ -79,7 +80,14 @@ private DaprRun(String testName, Class serviceClass, int maxWaitMilliseconds, AppRun.AppProtocol appProtocol) { - this(testName, ports, successMessage, serviceClass, maxWaitMilliseconds, appProtocol, UUID.randomUUID().toString()); + this( + testName, + ports, + successMessage, + serviceClass, + maxWaitMilliseconds, + appProtocol, + resolveDaprApiToken(serviceClass)); } private DaprRun(String testName, @@ -98,7 +106,7 @@ private DaprRun(String testName, new Command( successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol), - Map.of("DAPR_API_TOKEN", daprApiToken)); + daprApiToken == null ? null : Map.of("DAPR_API_TOKEN", daprApiToken)); this.listCommand = new Command( this.appName, "dapr list"); @@ -109,9 +117,10 @@ private DaprRun(String testName, this.maxWaitMilliseconds = maxWaitMilliseconds; this.started = new AtomicBoolean(false); this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass); - this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{ - put(Properties.API_TOKEN, daprApiToken); - }}); + this.propertyOverrides = daprApiToken == null ? ports.getPropertyOverrides() : + Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{ + put(Properties.API_TOKEN, daprApiToken); + }}); } public void start() throws InterruptedException, IOException { @@ -315,6 +324,22 @@ private static boolean isAppHealthCheckEnabled(Class serviceClass) { return false; } + private static String resolveDaprApiToken(Class serviceClass) { + if (serviceClass != null) { + DaprRunConfig daprRunConfig = (DaprRunConfig) serviceClass.getAnnotation(DaprRunConfig.class); + if (daprRunConfig != null) { + if (!daprRunConfig.enableDaprApiToken()) { + return null; + } + // We use the clas name itself as the token. Just needs to be deterministic. + return serviceClass.getCanonicalName(); + } + } + + // By default, we use a token. + return DEFAULT_DAPR_API_TOKEN; + } + private static void assertListeningOnPort(int port) { System.out.printf("Checking port %d ...\n", port); @@ -342,6 +367,8 @@ static class Builder { private AppRun.AppProtocol appProtocol; + private String daprApiToken; + Builder( String testName, Supplier portsSupplier, @@ -353,6 +380,7 @@ static class Builder { this.successMessage = successMessage; this.maxWaitMilliseconds = maxWaitMilliseconds; this.appProtocol = appProtocol; + this.daprApiToken = UUID.randomUUID().toString(); } public Builder withServiceClass(Class serviceClass) { diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java b/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java index 205eaf73d..72e9e3731 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java @@ -26,4 +26,6 @@ public @interface DaprRunConfig { boolean enableAppHealthCheck() default false; + + boolean enableDaprApiToken() default true; } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorService.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorService.java index 94f995982..0bb2bac0e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorService.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorService.java @@ -14,7 +14,10 @@ package io.dapr.it.actors.app; import io.dapr.actors.runtime.ActorRuntime; +import io.dapr.it.DaprRunConfig; +// Enable dapr-api-token once runtime supports it in standalone mode. +@DaprRunConfig(enableDaprApiToken = false) public class MyActorService { public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running"; From 9a0d0d7522c1889c5368ee0c57731746b953ae86 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 11 Oct 2024 15:22:44 -0700 Subject: [PATCH 3/5] Handle dapr-api-token for split run tests Signed-off-by: Artur Souza --- sdk-tests/src/test/java/io/dapr/it/DaprRun.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 654bab46f..99c38bfff 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -416,7 +416,8 @@ ImmutablePair splitBuild() { DAPR_SUCCESS_MESSAGE, null, this.maxWaitMilliseconds, - this.appProtocol); + this.appProtocol, + resolveDaprApiToken(serviceClass)); return new ImmutablePair<>(appRun, daprRun); } From ecd3f9b1459e42c2fab72d655065dc88abfc0985 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Mon, 14 Oct 2024 15:44:24 -0700 Subject: [PATCH 4/5] Fix more tests requiring dapr-api-token Signed-off-by: Artur Souza --- .../actors/services/springboot/StatefulActorService.java | 2 ++ .../io/dapr/it/state/HelloWorldGrpcStateService.java | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorService.java b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorService.java index 3a15a0c6b..c96fa05d0 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorService.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorService.java @@ -14,10 +14,12 @@ package io.dapr.it.actors.services.springboot; import io.dapr.actors.runtime.ActorRuntime; +import io.dapr.it.DaprRunConfig; import io.dapr.serializer.DefaultObjectSerializer; import java.time.Duration; +@DaprRunConfig(enableDaprApiToken = false) public class StatefulActorService { public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running"; diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java index a5b92eb8d..be493b469 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java @@ -14,7 +14,9 @@ package io.dapr.it.state; import com.google.protobuf.ByteString; +import io.dapr.client.DaprClientBuilder; import io.dapr.config.Properties; +import io.dapr.internal.grpc.DaprClientGrpcInterceptors; import io.dapr.v1.CommonProtos.StateItem; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc.DaprBlockingStub; @@ -38,8 +40,11 @@ public static void main(String[] args) { // If port string is not valid, it will throw an exception. int grpcPortInt = Integer.parseInt(grpcPort); - ManagedChannel channel = ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), grpcPortInt).usePlaintext().build(); - DaprBlockingStub client = DaprGrpc.newBlockingStub(channel); + ManagedChannel channel = ManagedChannelBuilder.forAddress( + Properties.SIDECAR_IP.get(), grpcPortInt).usePlaintext().build(); + DaprClientGrpcInterceptors interceptors = new DaprClientGrpcInterceptors( + Properties.API_TOKEN.get(), null); + DaprBlockingStub client = interceptors.intercept(DaprGrpc.newBlockingStub(channel)); String key = "mykey"; // First, write key-value pair. From 1cc787f88b03131892dedb6422b5fc3d11dbc8e5 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Tue, 15 Oct 2024 16:50:19 -0700 Subject: [PATCH 5/5] Fix IT for HelloWorldClientIT Signed-off-by: Artur Souza --- .../io/dapr/it/state/HelloWorldClientIT.java | 74 +++++++++---------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java index 1cf8db98e..713f152f1 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java @@ -13,13 +13,10 @@ package io.dapr.it.state; -import io.dapr.config.Properties; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -34,45 +31,44 @@ public void testHelloWorldState() throws Exception { false, 2000 ); - ManagedChannel channel = - ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), daprRun.getGrpcPort()).usePlaintext().build(); - DaprGrpc.DaprBlockingStub client = DaprGrpc.newBlockingStub(channel); + try (var client = daprRun.newDaprClientBuilder().build()) { + var stub = client.newGrpcStub("n/a", DaprGrpc::newBlockingStub); - String key = "mykey"; - { - DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - DaprProtos.GetStateResponse response = client.getState(req); - String value = response.getData().toStringUtf8(); - System.out.println("Got: " + value); - Assertions.assertEquals("Hello World", value); - } + String key = "mykey"; + { + DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest + .newBuilder() + .setStoreName(STATE_STORE_NAME) + .setKey(key) + .build(); + DaprProtos.GetStateResponse response = stub.getState(req); + String value = response.getData().toStringUtf8(); + System.out.println("Got: " + value); + Assertions.assertEquals("Hello World", value); + } - // Then, delete it. - { - DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - client.deleteState(req); - System.out.println("Deleted!"); - } + // Then, delete it. + { + DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest + .newBuilder() + .setStoreName(STATE_STORE_NAME) + .setKey(key) + .build(); + stub.deleteState(req); + System.out.println("Deleted!"); + } - { - DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - DaprProtos.GetStateResponse response = client.getState(req); - String value = response.getData().toStringUtf8(); - System.out.println("Got: " + value); - Assertions.assertEquals("", value); + { + DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest + .newBuilder() + .setStoreName(STATE_STORE_NAME) + .setKey(key) + .build(); + DaprProtos.GetStateResponse response = stub.getState(req); + String value = response.getData().toStringUtf8(); + System.out.println("Got: " + value); + Assertions.assertEquals("", value); + } } - channel.shutdown(); } }