diff --git a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java index e1c1306b0a..a6552f9cd4 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception { System.out.println("Going to publish message : " + message); } BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages) - .subscriberContext(getReactorContext()).block(); + .contextWrite(getReactorContext()).block(); System.out.println("Published the set of messages in a single call to Dapr"); if (res != null) { if (res.getFailedEntries().size() > 0) { diff --git a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java index fe9bf93e5e..cc8b3d1acd 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { client.publishEvent( PUBSUB_NAME, TOPIC_NAME, - message).subscriberContext(getReactorContext()).block(); + message).contextWrite(getReactorContext()).block(); System.out.println("Published message: " + message); try { diff --git a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java index 9078fb0d53..3ddfa1d38c 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception { InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep") .setHttpExtension(HttpExtension.POST); return client.invokeMethod(sleepRequest, TypeRef.get(Void.class)); - }).subscriberContext(getReactorContext()).block(); + }).contextWrite(getReactorContext()).block(); } } } diff --git a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java index 9a3111e859..323d4c0232 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java +++ b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java @@ -58,7 +58,7 @@ public Mono echo( InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo") .setBody(body) .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)); } /** @@ -71,7 +71,7 @@ public Mono echo( public Mono sleep(@RequestAttribute(name = "opentelemetry-context") Context context) { InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep") .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then(); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then(); } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index a5b963a8f2..44563fe391 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -30,6 +30,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -65,7 +66,7 @@ public Mono invoke(String actorType, String actorId, String methodName, .setMethod(methodName) .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload)) .build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, client).invokeActor(req, it) ) @@ -109,7 +110,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 3d10de6461..6495702b76 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -120,7 +120,7 @@ io.projectreactor reactor-core - 3.3.11.RELEASE + 3.5.2 test diff --git a/sdk/pom.xml b/sdk/pom.xml index 01a354fa5e..609a2e0ee4 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -44,7 +44,7 @@ io.projectreactor reactor-core - 3.3.11.RELEASE + 3.4.26 com.squareup.okhttp3 diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index cdc6c88b63..5e4812f260 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -68,6 +68,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.Closeable; import java.io.IOException; @@ -181,11 +182,8 @@ public Mono publishEvent(PublishEventRequest request) { envelopeBuilder.putAllMetadata(metadata); } - return Mono.subscriberContext().flatMap( - context -> - this.createMono( - it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it) - ) + return Mono.deferContextual(context -> this.createMono( + it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)) ).then(); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -254,8 +252,7 @@ public Mono> publishEvents(BulkPublishRequest requ for (BulkPublishEntry entry: request.getEntries()) { entryMap.put(entry.getEntryId(), entry); } - return Mono.subscriberContext().flatMap( - context -> + return Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it) ) @@ -298,8 +295,8 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef // gRPC to gRPC does not handle metadata in Dapr runtime proto. // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 - return Mono.subscriberContext().flatMap( - context -> this.createMono( + return Mono.deferContextual(context -> + this.createMono( it -> intercept(context, asyncStub).invokeService(envelope, it) ) ).flatMap( @@ -345,8 +342,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) } DaprProtos.InvokeBindingRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( - context -> this.createMono( + return Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub).invokeBinding(envelope, it) ) ).flatMap( @@ -392,8 +388,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { DaprProtos.GetStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( - context -> + return Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub).getState(envelope, it) ) @@ -441,8 +436,8 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe DaprProtos.GetBulkStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( - context -> this.createMono(it -> intercept(context, asyncStub) + return Mono.deferContextual(context -> + this.createMono(it -> intercept(context, asyncStub) .getBulkState(envelope, it) ) ).map( @@ -525,7 +520,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request } DaprProtos.ExecuteStateTransactionRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it)) ).then(); } catch (Exception e) { @@ -551,7 +546,7 @@ public Mono saveBulkState(SaveStateRequest request) { } DaprProtos.SaveStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).saveState(req, it)) ).then(); } catch (Exception ex) { @@ -635,8 +630,8 @@ public Mono deleteState(DeleteStateRequest request) { DaprProtos.DeleteStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( - context -> this.createMono(it -> intercept(context, asyncStub).deleteState(req, it)) + return Mono.deferContextual(context -> + this.createMono(it -> intercept(context, asyncStub).deleteState(req, it)) ).then(); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -713,7 +708,7 @@ public Mono> getSecret(GetSecretRequest request) { } DaprProtos.GetSecretRequest req = requestBuilder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).getSecret(req, it)) ).map(DaprProtos.GetSecretResponse::getDataMap); } @@ -738,9 +733,8 @@ public Mono>> getBulkSecret(GetBulkSecretRequest DaprProtos.GetBulkSecretRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( - context -> - this.createMono( + return Mono.deferContextual(context -> + this.createMono( it -> intercept(context, asyncStub).getBulkSecret(envelope, it) ) ).map(it -> { @@ -791,8 +785,7 @@ public Mono> queryState(QueryStateRequest request, Typ DaprProtos.QueryStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( - context -> this.createMono( + return Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it) ) ).map( @@ -855,7 +848,7 @@ public void close() throws Exception { */ @Override public Mono shutdown() { - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it)) ).then(); @@ -889,8 +882,7 @@ public Mono> getConfiguration(GetConfigurationReq } private Mono> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) { - return Mono.subscriberContext().flatMap( - context -> + return Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it) ) @@ -1034,7 +1026,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 2ec7bf780a..24c61f94ae 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -50,6 +50,7 @@ import io.dapr.utils.TypeRef; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.io.IOException; import java.util.ArrayList; @@ -177,7 +178,7 @@ public Mono publishEvent(PublishEventRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context ) @@ -237,7 +238,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef } else { headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType()); } - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]), httpExtension.getQueryParams(), serializedRequestBody, headers, context) ); @@ -309,7 +310,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name }; - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi( httpMethod, pathSegments, null, payload, null, context) ); @@ -349,7 +350,7 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context) ).flatMap(s -> { @@ -394,7 +395,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context) ).flatMap(s -> { @@ -452,7 +453,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context ) @@ -500,7 +501,7 @@ public Mono saveBulkState(SaveStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context) ).then(); @@ -543,7 +544,7 @@ public Mono deleteState(DeleteStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context) ).then(); @@ -631,7 +632,7 @@ public Mono> getSecret(GetSecretRequest request) { Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -667,7 +668,7 @@ public Mono>> getBulkSecret(GetBulkSecretRequest Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -709,7 +710,7 @@ public Mono> queryState(QueryStateRequest request, Typ } else { throw new IllegalArgumentException("Both query and queryString fields are not set."); } - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedRequest, null, context) @@ -739,7 +740,7 @@ public void close() { @Override public Mono shutdown() { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, null, context)) .then(); @@ -810,7 +811,7 @@ public Mono> getConfiguration(GetConfigurationReq queryParams.putAll(queryArgs); String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi( DaprHttp.HttpMethods.GET.name(), @@ -871,7 +872,7 @@ public Flux subscribeConfiguration(SubscribeConf String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" }; - SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap( + SubscribeConfigurationResponse res = Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, @@ -913,7 +914,7 @@ public Mono unsubscribeConfiguration(Unsubscri String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi( DaprHttp.HttpMethods.GET.name(), diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 66b9c9a06d..13f88924f2 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -30,6 +30,7 @@ import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -183,7 +184,7 @@ public Mono invokeApi( String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return this.invokeApi(method, pathSegments, urlParameters, (byte[]) null, headers, context); } @@ -204,7 +205,7 @@ public Mono invokeApi( Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return this.invokeApi( method, pathSegments, urlParameters, content == null @@ -229,7 +230,7 @@ public Mono invokeApi( Map> urlParameters, byte[] content, Map headers, - Context context) { + ContextView context) { // fromCallable() is needed so the invocation does not happen early, causing a hot mono. return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)) .flatMap(f -> Mono.fromFuture(f)); @@ -259,7 +260,7 @@ private CompletableFuture doInvokeApi(String method, String[] pathSegments, Map> urlParameters, byte[] content, Map headers, - Context context) { + ContextView context) { final String requestId = UUID.randomUUID().toString(); RequestBody body; diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java index b4940d8530..ff631430e5 100644 --- a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java +++ b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java @@ -23,6 +23,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.Map; import java.util.Optional; @@ -61,7 +62,7 @@ private GrpcWrapper() { * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - public static DaprGrpc.DaprStub intercept(final Context context, DaprGrpc.DaprStub client) { + public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index e323d9b574..0b27bf815f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -189,7 +189,7 @@ public void invokeServiceVoidWithTracingTest() { .setBody("request") .setHttpExtension(HttpExtension.NONE); Mono result = this.client.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(contextCopy == null ? Context.empty() : contextCopy)); + .contextWrite((contextCopy == null ? Context.empty() : contextCopy)); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index d1831fb23b..91935641b8 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -422,7 +422,7 @@ public void invokeServiceWithContext() { .setBody("request") .setHttpExtension(HttpExtension.POST); Mono result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(context)); + .contextWrite(context); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java index d83f517c54..6112aa409f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java @@ -15,6 +15,7 @@ import reactor.core.publisher.Mono; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.List; import java.util.Map; @@ -45,7 +46,7 @@ public Mono invokeApi(String method, String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -58,7 +59,7 @@ public Mono invokeApi(String method, Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -71,7 +72,7 @@ public Mono invokeApi(String method, Map> urlParameters, byte[] content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); }