Skip to content

Commit

Permalink
refs dapr#815
Browse files Browse the repository at this point in the history
updating project reactor dependency to work with spring boot 3,
replacing deprecated calls to subscriberContext

Signed-off-by: Jan Czapla <jan.czapla@bosch.com>
  • Loading branch information
jczbsh79 committed Dec 5, 2022
1 parent dcee2f7 commit 99a91f0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 27 deletions.
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
52 changes: 26 additions & 26 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
it -> intercept(Context.of(context), asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
} catch (Exception ex) {
Expand All @@ -204,9 +204,9 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
it -> intercept(Context.of(context), asyncStub).invokeService(envelope, it)
)
).flatMap(
it -> {
Expand Down Expand Up @@ -251,9 +251,9 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
it -> intercept(Context.of(context), asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
Expand Down Expand Up @@ -298,10 +298,10 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

DaprProtos.GetStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetStateResponse>createMono(
it -> intercept(context, asyncStub).getState(envelope, it)
it -> intercept(Context.of(context), asyncStub).getState(envelope, it)
)
).map(
it -> {
Expand Down Expand Up @@ -347,8 +347,8 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe

DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
return Mono.deferContextual(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(Context.of(context), asyncStub)
.getBulkState(envelope, it)
)
).map(
Expand Down Expand Up @@ -431,8 +431,8 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
}
DaprProtos.ExecuteStateTransactionRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(Context.of(context), asyncStub).executeStateTransaction(req, it))
).then();
} catch (Exception e) {
return DaprException.wrapMono(e);
Expand All @@ -457,8 +457,8 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}
DaprProtos.SaveStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(Context.of(context), asyncStub).saveState(req, it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -541,8 +541,8 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

DaprProtos.DeleteStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(Context.of(context), asyncStub).deleteState(req, it))
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -619,8 +619,8 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}
DaprProtos.GetSecretRequest req = requestBuilder.build();

return Mono.subscriberContext().flatMap(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
return Mono.deferContextual(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(Context.of(context), asyncStub).getSecret(req, it))
).map(DaprProtos.GetSecretResponse::getDataMap);
}

Expand All @@ -644,10 +644,10 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

DaprProtos.GetBulkSecretRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
it -> intercept(Context.of(context), asyncStub).getBulkSecret(envelope, it)
)
).map(it -> {
Map<String, DaprProtos.SecretResponse> secretsMap = it.getDataMap();
Expand Down Expand Up @@ -697,9 +697,9 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ

DaprProtos.QueryStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.QueryStateResponse>createMono(
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
it -> intercept(Context.of(context), asyncStub).queryStateAlpha1(envelope, it)
)
).map(
it -> {
Expand Down Expand Up @@ -761,9 +761,9 @@ public void close() throws Exception {
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
it -> intercept(Context.of(context), asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
}

Expand Down Expand Up @@ -795,10 +795,10 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
}

private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetConfigurationResponse>createMono(
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
it -> intercept(Context.of(context), asyncStub).getConfigurationAlpha1(envelope, it)
)
).map(
it -> {
Expand Down

0 comments on commit 99a91f0

Please sign in to comment.