Skip to content

Commit 6a6901d

Browse files
authored
Fix bi-di subscription to support dapr-api-token (#1142)
* Fix bi-di subscription to support dapr-api-token Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Remove dapr-api-token from actor services Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Handle dapr-api-token for split run tests Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Fix more tests requiring dapr-api-token Signed-off-by: Artur Souza <asouza.pro@gmail.com> * Fix IT for HelloWorldClientIT Signed-off-by: Artur Souza <asouza.pro@gmail.com> --------- Signed-off-by: Artur Souza <asouza.pro@gmail.com>
1 parent 8a0913d commit 6a6901d

File tree

14 files changed

+238
-195
lines changed

14 files changed

+238
-195
lines changed

sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
5959
* @param overrideProperties Override properties.
6060
*/
6161
public ActorClient(Properties overrideProperties) {
62-
this(buildManagedChannel(overrideProperties), null);
62+
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
6363
}
6464

6565
/**
@@ -69,7 +69,7 @@ public ActorClient(Properties overrideProperties) {
6969
* @param resiliencyOptions Client resiliency options.
7070
*/
7171
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
72-
this(buildManagedChannel(overrideProperties), resiliencyOptions);
72+
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
7373
}
7474

7575
/**
@@ -80,9 +80,10 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
8080
*/
8181
private ActorClient(
8282
ManagedChannel grpcManagedChannel,
83-
ResiliencyOptions resiliencyOptions) {
83+
ResiliencyOptions resiliencyOptions,
84+
String daprApiToken) {
8485
this.grpcManagedChannel = grpcManagedChannel;
85-
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions);
86+
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
8687
}
8788

8889
/**
@@ -136,7 +137,11 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
136137
*/
137138
private static DaprClient buildDaprClient(
138139
Channel grpcManagedChannel,
139-
ResiliencyOptions resiliencyOptions) {
140-
return new DaprClientImpl(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
140+
ResiliencyOptions resiliencyOptions,
141+
String daprApiToken) {
142+
return new DaprClientImpl(
143+
DaprGrpc.newStub(grpcManagedChannel),
144+
resiliencyOptions,
145+
daprApiToken);
141146
}
142147
}

sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@
4242
*/
4343
class DaprClientImpl implements DaprClient {
4444

45-
/**
46-
* Timeout policy for SDK calls to Dapr API.
47-
*/
48-
private final TimeoutPolicy timeoutPolicy;
49-
5045
/**
5146
* Retry policy for SDK calls to Dapr API.
5247
*/
@@ -57,16 +52,22 @@ class DaprClientImpl implements DaprClient {
5752
*/
5853
private final DaprGrpc.DaprStub client;
5954

55+
/**
56+
* gRPC client interceptors.
57+
*/
58+
private final DaprClientGrpcInterceptors grpcInterceptors;
59+
6060
/**
6161
* Internal constructor.
6262
*
6363
* @param grpcClient Dapr's GRPC client.
64-
* @param resiliencyOptions Client resiliency options (optional)
64+
* @param resiliencyOptions Client resiliency options (optional).
65+
* @param daprApiToken Dapr API token (optional).
6566
*/
66-
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
67-
this.client = intercept(grpcClient);
68-
this.timeoutPolicy = new TimeoutPolicy(
69-
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
67+
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
68+
this.client = grpcClient;
69+
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
70+
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
7071
this.retryPolicy = new RetryPolicy(
7172
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
7273
}
@@ -85,54 +86,11 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
8586
.build();
8687
return Mono.deferContextual(
8788
context -> this.<DaprProtos.InvokeActorResponse>createMono(
88-
it -> intercept(context, this.timeoutPolicy, client).invokeActor(req, it)
89+
it -> this.grpcInterceptors.intercept(client, context).invokeActor(req, it)
8990
)
9091
).map(r -> r.getData().toByteArray());
9192
}
9293

93-
/**
94-
* Populates GRPC client with interceptors.
95-
*
96-
* @param client GRPC client for Dapr.
97-
* @return Client after adding interceptors.
98-
*/
99-
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
100-
ClientInterceptor interceptor = new ClientInterceptor() {
101-
@Override
102-
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
103-
MethodDescriptor<ReqT, RespT> methodDescriptor,
104-
CallOptions options,
105-
Channel channel) {
106-
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
107-
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
108-
@Override
109-
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
110-
String daprApiToken = Properties.API_TOKEN.get();
111-
if (daprApiToken != null) {
112-
metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
113-
}
114-
115-
super.start(responseListener, metadata);
116-
}
117-
};
118-
}
119-
};
120-
return client.withInterceptors(interceptor);
121-
}
122-
123-
/**
124-
* Populates GRPC client with interceptors for telemetry.
125-
*
126-
* @param context Reactor's context.
127-
* @param timeoutPolicy Timeout policy for gRPC call.
128-
* @param client GRPC client for Dapr.
129-
* @return Client after adding interceptors.
130-
*/
131-
private static DaprGrpc.DaprStub intercept(
132-
ContextView context, TimeoutPolicy timeoutPolicy, DaprGrpc.DaprStub client) {
133-
return DaprClientGrpcInterceptors.intercept(client, timeoutPolicy, context);
134-
}
135-
13694
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
13795
return retryPolicy.apply(
13896
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));

sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void setup() throws IOException {
106106
InProcessChannelBuilder.forName(serverName).directExecutor().build());
107107

108108
// Create a HelloWorldClient using the in-process channel;
109-
client = new DaprClientImpl(DaprGrpc.newStub(channel), null);
109+
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
110110
}
111111

112112
@Test

sdk-tests/src/test/java/io/dapr/it/DaprRun.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
import org.apache.commons.lang3.tuple.ImmutablePair;
3131

3232
import java.io.IOException;
33+
import java.util.Collections;
34+
import java.util.HashMap;
3335
import java.util.Map;
36+
import java.util.UUID;
3437
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import java.util.function.Supplier;
@@ -40,6 +43,7 @@
4043

4144
public class DaprRun implements Stoppable {
4245

46+
private static final String DEFAULT_DAPR_API_TOKEN = UUID.randomUUID().toString();
4347
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
4448

4549
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " +
@@ -68,19 +72,41 @@ public class DaprRun implements Stoppable {
6872

6973
private final boolean hasAppHealthCheck;
7074

75+
private final Map<Property<?>, String> propertyOverrides;
76+
7177
private DaprRun(String testName,
7278
DaprPorts ports,
7379
String successMessage,
7480
Class serviceClass,
7581
int maxWaitMilliseconds,
7682
AppRun.AppProtocol appProtocol) {
83+
this(
84+
testName,
85+
ports,
86+
successMessage,
87+
serviceClass,
88+
maxWaitMilliseconds,
89+
appProtocol,
90+
resolveDaprApiToken(serviceClass));
91+
}
92+
93+
private DaprRun(String testName,
94+
DaprPorts ports,
95+
String successMessage,
96+
Class serviceClass,
97+
int maxWaitMilliseconds,
98+
AppRun.AppProtocol appProtocol,
99+
String daprApiToken) {
77100
// The app name needs to be deterministic since we depend on it to kill previous runs.
78101
this.appName = serviceClass == null ?
79102
testName.toLowerCase() :
80103
String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase();
81104
this.appProtocol = appProtocol;
82105
this.startCommand =
83-
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol));
106+
new Command(
107+
successMessage,
108+
buildDaprCommand(this.appName, serviceClass, ports, appProtocol),
109+
daprApiToken == null ? null : Map.of("DAPR_API_TOKEN", daprApiToken));
84110
this.listCommand = new Command(
85111
this.appName,
86112
"dapr list");
@@ -91,6 +117,10 @@ private DaprRun(String testName,
91117
this.maxWaitMilliseconds = maxWaitMilliseconds;
92118
this.started = new AtomicBoolean(false);
93119
this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass);
120+
this.propertyOverrides = daprApiToken == null ? ports.getPropertyOverrides() :
121+
Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{
122+
put(Properties.API_TOKEN, daprApiToken);
123+
}});
94124
}
95125

96126
public void start() throws InterruptedException, IOException {
@@ -149,7 +179,7 @@ public void stop() throws InterruptedException, IOException {
149179
}
150180

151181
public Map<Property<?>, String> getPropertyOverrides() {
152-
return this.ports.getPropertyOverrides();
182+
return this.propertyOverrides;
153183
}
154184

155185
public DaprClientBuilder newDaprClientBuilder() {
@@ -239,17 +269,13 @@ public String getAppName() {
239269

240270
public DaprClient newDaprClient() {
241271
return new DaprClientBuilder()
242-
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
243-
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
244-
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
272+
.withPropertyOverrides(this.getPropertyOverrides())
245273
.build();
246274
}
247275

248276
public DaprPreviewClient newDaprPreviewClient() {
249277
return new DaprClientBuilder()
250-
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
251-
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
252-
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
278+
.withPropertyOverrides(this.getPropertyOverrides())
253279
.buildPreviewClient();
254280
}
255281

@@ -298,6 +324,22 @@ private static boolean isAppHealthCheckEnabled(Class serviceClass) {
298324
return false;
299325
}
300326

327+
private static String resolveDaprApiToken(Class serviceClass) {
328+
if (serviceClass != null) {
329+
DaprRunConfig daprRunConfig = (DaprRunConfig) serviceClass.getAnnotation(DaprRunConfig.class);
330+
if (daprRunConfig != null) {
331+
if (!daprRunConfig.enableDaprApiToken()) {
332+
return null;
333+
}
334+
// We use the clas name itself as the token. Just needs to be deterministic.
335+
return serviceClass.getCanonicalName();
336+
}
337+
}
338+
339+
// By default, we use a token.
340+
return DEFAULT_DAPR_API_TOKEN;
341+
}
342+
301343
private static void assertListeningOnPort(int port) {
302344
System.out.printf("Checking port %d ...\n", port);
303345

@@ -325,6 +367,8 @@ static class Builder {
325367

326368
private AppRun.AppProtocol appProtocol;
327369

370+
private String daprApiToken;
371+
328372
Builder(
329373
String testName,
330374
Supplier<DaprPorts> portsSupplier,
@@ -336,6 +380,7 @@ static class Builder {
336380
this.successMessage = successMessage;
337381
this.maxWaitMilliseconds = maxWaitMilliseconds;
338382
this.appProtocol = appProtocol;
383+
this.daprApiToken = UUID.randomUUID().toString();
339384
}
340385

341386
public Builder withServiceClass(Class serviceClass) {
@@ -371,7 +416,8 @@ ImmutablePair<AppRun, DaprRun> splitBuild() {
371416
DAPR_SUCCESS_MESSAGE,
372417
null,
373418
this.maxWaitMilliseconds,
374-
this.appProtocol);
419+
this.appProtocol,
420+
resolveDaprApiToken(serviceClass));
375421

376422
return new ImmutablePair<>(appRun, daprRun);
377423
}

sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@
2626
public @interface DaprRunConfig {
2727

2828
boolean enableAppHealthCheck() default false;
29+
30+
boolean enableDaprApiToken() default true;
2931
}

sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
package io.dapr.it.actors.app;
1515

1616
import io.dapr.actors.runtime.ActorRuntime;
17+
import io.dapr.it.DaprRunConfig;
1718

19+
// Enable dapr-api-token once runtime supports it in standalone mode.
20+
@DaprRunConfig(enableDaprApiToken = false)
1821
public class MyActorService {
1922
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running";
2023

sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
package io.dapr.it.actors.services.springboot;
1515

1616
import io.dapr.actors.runtime.ActorRuntime;
17+
import io.dapr.it.DaprRunConfig;
1718
import io.dapr.serializer.DefaultObjectSerializer;
1819

1920
import java.time.Duration;
2021

22+
@DaprRunConfig(enableDaprApiToken = false)
2123
public class StatefulActorService {
2224

2325
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running";

0 commit comments

Comments
 (0)