Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump from reactor 2.3.5.RELEASE to 2.7.8 #830

Merged
merged 4 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static OpenTelemetry createOpenTelemetry() {
* Converts current OpenTelemetry's context into Reactor's context.
* @return Reactor's context.
*/
public static reactor.util.context.Context getReactorContext() {
public static reactor.util.context.ContextView getReactorContext() {
return getReactorContext(Context.current());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
.contextWrite(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
message).subscriberContext(getReactorContext()).block();
message).contextWrite(getReactorContext()).block();
System.out.println("Published message: " + message);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
}).subscriberContext(getReactorContext()).block();
}).contextWrite(getReactorContext()).block();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Mono<byte[]> echo(
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
.setBody(body)
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
}

/**
Expand All @@ -71,7 +71,7 @@ public Mono<byte[]> echo(
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
.setHttpExtension(HttpExtension.POST);
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setMethod(methodName)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, client).invokeActor(req, it)
)
Expand Down Expand Up @@ -109,7 +109,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
2 changes: 1 addition & 1 deletion sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testInvokeTimeout() throws Exception {
.block(Duration.ofMillis(10))).getMessage();
long delay = System.currentTimeMillis() - started;
assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out.
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testInvokeTimeout() throws Exception {
}).getMessage();
long delay = System.currentTimeMillis() - started;
assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out.
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testInvoke() throws Exception {
try (Scope scope = span.makeCurrent()) {
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
.subscriberContext(getReactorContext())
.contextWrite(getReactorContext())
.block();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testInvoke() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
.subscriberContext(getReactorContext())
.contextWrite(getReactorContext())
.block();
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
32 changes: 16 additions & 16 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -181,7 +181,7 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(Mono::just).flatMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small change
Mono.deferContextual(context ->

context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
Expand Down Expand Up @@ -254,7 +254,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
for (BulkPublishEntry<T> entry: request.getEntries()) {
entryMap.put(entry.getEntryId(), entry);
}
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.BulkPublishResponse>createMono(
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
Expand Down Expand Up @@ -298,7 +298,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
Expand Down Expand Up @@ -345,7 +345,7 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
Expand Down Expand Up @@ -392,7 +392,7 @@ public <T> Mono<State<T>> getState(GetStateRequest request, TypeRef<T> type) {

DaprProtos.GetStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetStateResponse>createMono(
it -> intercept(context, asyncStub).getState(envelope, it)
Expand Down Expand Up @@ -441,7 +441,7 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe

DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
.getBulkState(envelope, it)
)
Expand Down Expand Up @@ -525,7 +525,7 @@ public Mono<Void> executeStateTransaction(ExecuteStateTransactionRequest request
}
DaprProtos.ExecuteStateTransactionRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
).then();
} catch (Exception e) {
Expand All @@ -551,7 +551,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}
DaprProtos.SaveStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
).then();
} catch (Exception ex) {
Expand Down Expand Up @@ -635,7 +635,7 @@ public Mono<Void> deleteState(DeleteStateRequest request) {

DaprProtos.DeleteStateRequest req = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
).then();
} catch (Exception ex) {
Expand Down Expand Up @@ -713,7 +713,7 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}
DaprProtos.GetSecretRequest req = requestBuilder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
).map(DaprProtos.GetSecretResponse::getDataMap);
}
Expand All @@ -738,7 +738,7 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

DaprProtos.GetBulkSecretRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
Expand Down Expand Up @@ -791,7 +791,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ

DaprProtos.QueryStateRequest envelope = builder.build();

return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<DaprProtos.QueryStateResponse>createMono(
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
)
Expand Down Expand Up @@ -855,7 +855,7 @@ public void close() throws Exception {
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
Expand Down Expand Up @@ -889,7 +889,7 @@ public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationReq
}

private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
return Mono.subscriberContext().flatMap(
return Mono.deferContextual(
context ->
this.<DaprProtos.GetConfigurationResponse>createMono(
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
Expand Down Expand Up @@ -1034,7 +1034,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
}

Expand Down
Loading