diff --git a/sdk-tests/components/http_binding.yaml b/sdk-tests/components/http_binding.yaml new file mode 100644 index 000000000..5a3ca0a1d --- /dev/null +++ b/sdk-tests/components/http_binding.yaml @@ -0,0 +1,27 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: github-http-binding-404 +spec: + type: bindings.http + version: v1 + metadata: + - name: url + value: https://api.github.com/unknown_path +scopes: + - bindingit-httpoutputbinding-exception +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: github-http-binding-404-success +spec: + type: bindings.http + version: v1 + metadata: + - name: url + value: https://api.github.com/unknown_path + - name: errorIfNot2XX + value: "false" +scopes: + - bindingit-httpoutputbinding-ignore-error \ No newline at end of file diff --git a/sdk-tests/deploy/local-test.yml b/sdk-tests/deploy/local-test.yml index 989757a78..f920f6acc 100644 --- a/sdk-tests/deploy/local-test.yml +++ b/sdk-tests/deploy/local-test.yml @@ -6,14 +6,14 @@ services: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - - 22181:2181 + - 2181:2181 kafka: image: confluentinc/cp-kafka:7.4.4 depends_on: - zookeeper ports: - - 9092:9092 + - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java index cc83b9b62..610a55820 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 The Dapr Authors + * Copyright 2024 The Dapr Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -17,6 +17,7 @@ import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import org.junit.jupiter.api.Test; @@ -26,6 +27,7 @@ import static io.dapr.it.Retry.callWithRetry; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -33,15 +35,50 @@ */ public class BindingIT extends BaseIT { - private static final String BINDING_NAME = "sample123"; - - private static final String BINDING_OPERATION = "create"; - - public static class MyClass { - public MyClass() { + @Test + public void httpOutputBindingError() throws Exception { + startDaprApp( + this.getClass().getSimpleName() + "-httpoutputbinding-exception", + 60000); + try(DaprClient client = new DaprClientBuilder().build()) { + // Validate error message + callWithRetry(() -> { + System.out.println("Checking exception handling for output binding ..."); + try { + client.invokeBinding("github-http-binding-404", "get", "").block(); + fail("Should throw an exception"); + } catch (DaprException e) { + assertEquals(404, e.getHttpStatusCode()); + // This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not + // consistent between HTTP and gRPC. + assertTrue(new String(e.getPayload()).contains( + "error invoking output binding github-http-binding-404: received status code 404")); + } + }, 10000); } + } - public String message; + @Test + public void httpOutputBindingErrorIgnoredByComponent() throws Exception { + startDaprApp( + this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error", + 60000); + try(DaprClient client = new DaprClientBuilder().build()) { + // Validate error message + callWithRetry(() -> { + System.out.println("Checking exception handling for output binding ..."); + try { + client.invokeBinding("github-http-binding-404-success", "get", "").block(); + fail("Should throw an exception"); + } catch (DaprException e) { + assertEquals(404, e.getHttpStatusCode()); + // The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be + // consistent between HTTP and gRPC. + assertTrue(new String(e.getPayload()).contains("\"message\":\"Not Found\"")); + assertTrue(new String(e.getPayload()).contains("\"documentation_url\":\"https://docs.github.com/rest\"")); + } + }, 10000); + } } @Test @@ -53,11 +90,13 @@ public void inputOutputBinding() throws Exception { true, 60000); + var bidingName = "sample123"; + try(DaprClient client = new DaprClientBuilder().build()) { callWithRetry(() -> { System.out.println("Checking if input binding is up before publishing events ..."); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, "ping").block(); + bidingName, "create", "ping").block(); try { Thread.sleep(1000); @@ -76,14 +115,14 @@ public void inputOutputBinding() throws Exception { System.out.println("sending first message"); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); + bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); // This is an example of sending a plain string. The input binding will receive // cat final String m = "cat"; System.out.println("sending " + m); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); + bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); // Metadata is not used by Kafka component, so it is not possible to validate. callWithRetry(() -> { @@ -115,4 +154,11 @@ public void inputOutputBinding() throws Exception { }, 8000); } } + + public static class MyClass { + public MyClass() { + } + + public String message; + } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index aabc07a60..dc0f75210 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -56,6 +56,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.exceptions.DaprException; +import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.internal.grpc.DaprClientGrpcInterceptors; import io.dapr.internal.resiliency.RetryPolicy; import io.dapr.internal.resiliency.TimeoutPolicy; @@ -76,6 +77,7 @@ import io.dapr.v1.DaprProtos.RegisteredComponents; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.Metadata; import io.grpc.stub.AbstractStub; import io.grpc.stub.StreamObserver; import reactor.core.publisher.Flux; @@ -99,6 +101,10 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode; +import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode; +import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode; + /** * Implementation of the Dapr client combining gRPC and HTTP (when applicable). * @@ -486,12 +492,22 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) } DaprProtos.InvokeBindingRequest envelope = builder.build(); + Metadata responseMetadata = new Metadata(); return Mono.deferContextual( context -> this.createMono( - it -> intercept(context, asyncStub).invokeBinding(envelope, it) + responseMetadata, + it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it) ) ).flatMap( it -> { + int httpStatusCode = + parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", "")); + if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) { + // Exception condition in a successful request. + // This is useful to send an exception due to an error from the HTTP binding component. + throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray())); + } + try { return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); } catch (IOException e) { @@ -1201,17 +1217,39 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context); } + /** + * Populates GRPC client with interceptors for telemetry. + * + * @param context Reactor's context. + * @param client GRPC client for Dapr. + * @param metadataConsumer Consumer of gRPC metadata. + * @return Client after adding interceptors. + */ + private DaprGrpc.DaprStub intercept( + ContextView context, DaprGrpc.DaprStub client, Consumer metadataConsumer) { + return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer); + } + private Mono createMono(Consumer> consumer) { + return this.createMono(null, consumer); + } + + private Mono createMono(Metadata metadata, Consumer> consumer) { return retryPolicy.apply( - Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); + Mono.create(sink -> DaprException.wrap(() -> consumer.accept( + createStreamObserver(sink, metadata))).run())); } private Flux createFlux(Consumer> consumer) { + return this.createFlux(null, consumer); + } + + private Flux createFlux(Metadata metadata, Consumer> consumer) { return retryPolicy.apply( - Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); + Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run())); } - private StreamObserver createStreamObserver(MonoSink sink) { + private StreamObserver createStreamObserver(MonoSink sink, Metadata grpcMetadata) { return new StreamObserver() { @Override public void onNext(T value) { @@ -1220,7 +1258,7 @@ public void onNext(T value) { @Override public void onError(Throwable t) { - sink.error(DaprException.propagate(new ExecutionException(t))); + sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t))); } @Override @@ -1230,7 +1268,7 @@ public void onCompleted() { }; } - private StreamObserver createStreamObserver(FluxSink sink) { + private StreamObserver createStreamObserver(FluxSink sink, final Metadata grpcMetadata) { return new StreamObserver() { @Override public void onNext(T value) { @@ -1239,7 +1277,7 @@ public void onNext(T value) { @Override public void onError(Throwable t) { - sink.error(DaprException.propagate(new ExecutionException(t))); + sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t))); } @Override diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 54b64bd77..01485e433 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -18,6 +18,7 @@ import io.dapr.config.Properties; import io.dapr.exceptions.DaprError; import io.dapr.exceptions.DaprException; +import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.utils.Version; import okhttp3.Call; import okhttp3.Callback; @@ -387,17 +388,19 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { - if (!response.isSuccessful()) { + int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code()); + if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) { try { byte[] payload = getBodyBytesOrEmptyArray(response); DaprError error = parseDaprError(payload); + if (error != null) { - future.completeExceptionally(new DaprException(error, payload, response.code())); + future.completeExceptionally(new DaprException(error, payload, httpStatusCode)); return; } future.completeExceptionally( - new DaprException("UNKNOWN", "", payload, response.code())); + new DaprException("UNKNOWN", "", payload, httpStatusCode)); return; } catch (DaprException e) { future.completeExceptionally(e); @@ -410,8 +413,24 @@ public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) t response.headers().forEach(pair -> { mapHeaders.put(pair.getFirst(), pair.getSecond()); }); - future.complete(new Response(result, mapHeaders, response.code())); + future.complete(new Response(result, mapHeaders, httpStatusCode)); } } + private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) { + if ((headerValue == null) || headerValue.isEmpty()) { + return defaultStatusCode; + } + + // Metadata used to override status code with code received from HTTP binding. + try { + int httpStatusCode = Integer.parseInt(headerValue); + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { + return httpStatusCode; + } + return defaultStatusCode; + } catch (NumberFormatException nfe) { + return defaultStatusCode; + } + } } diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprException.java b/sdk/src/main/java/io/dapr/exceptions/DaprException.java index 0801069e8..9e40298bf 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprException.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprException.java @@ -14,6 +14,7 @@ package io.dapr.exceptions; import com.google.rpc.Status; +import io.dapr.internal.exceptions.DaprHttpException; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import reactor.core.Exceptions; @@ -170,11 +171,33 @@ public DaprException(String errorCode, String message, Throwable cause) { */ public DaprException( String errorCode, String message, Throwable cause, DaprErrorDetails errorDetails, byte[] payload) { - super(buildErrorMessage(errorCode, 0, message), cause); - this.httpStatusCode = 0; + this(errorCode, message, cause, errorDetails, payload, 0); + } + + /** + * New exception from a server-side generated error code and message. + * @param errorCode Client-side error code. + * @param message Client-side error message. + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A {@code null} value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + * @param errorDetails the status details for the error. + * @param payload Raw error payload. + * @param httpStatusCode Optional HTTP Status code for the error, 0 if not applicable. + */ + private DaprException( + String errorCode, + String message, + Throwable cause, + DaprErrorDetails errorDetails, + byte[] payload, + int httpStatusCode) { + super(buildErrorMessage(errorCode, httpStatusCode, message), cause); this.errorCode = errorCode; this.errorDetails = errorDetails == null ? DaprErrorDetails.EMPTY_INSTANCE : errorDetails; this.payload = payload; + this.httpStatusCode = httpStatusCode; } /** @@ -305,21 +328,27 @@ public static RuntimeException propagate(Throwable exception) { return (DaprException) exception; } + int httpStatusCode = 0; + byte[] httpPayload = null; Throwable e = exception; while (e != null) { - if (e instanceof StatusRuntimeException) { + if (e instanceof DaprHttpException) { + DaprHttpException daprHttpException = (DaprHttpException) e; + httpStatusCode = daprHttpException.getStatusCode(); + httpPayload = daprHttpException.getPayload(); + } else if (e instanceof StatusRuntimeException) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e; Status status = StatusProto.fromThrowable(statusRuntimeException); DaprErrorDetails errorDetails = new DaprErrorDetails(status); return new DaprException( - statusRuntimeException.getStatus().getCode().toString(), - statusRuntimeException.getStatus().getDescription(), - exception, - errorDetails, - status.toByteArray()); - + statusRuntimeException.getStatus().getCode().toString(), + statusRuntimeException.getStatus().getDescription(), + exception, + errorDetails, + httpPayload != null ? httpPayload : status.toByteArray(), + httpStatusCode); } e = e.getCause(); @@ -329,19 +358,30 @@ public static RuntimeException propagate(Throwable exception) { return (IllegalArgumentException) exception; } + if (exception instanceof DaprHttpException) { + DaprHttpException daprHttpException = (DaprHttpException)exception; + return new DaprException( + io.grpc.Status.UNKNOWN.toString(), + null, + exception, + null, + daprHttpException.getPayload(), + daprHttpException.getStatusCode()); + } + return new DaprException(exception); } private static String buildErrorMessage(String errorCode, int httpStatusCode, String message) { String result = ((errorCode == null) || errorCode.isEmpty()) ? "UNKNOWN: " : errorCode + ": "; if ((message == null) || message.isEmpty()) { - if (httpStatusCode > 0) { + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { return result + "HTTP status code: " + httpStatusCode; } return result; } - if (httpStatusCode > 0) { + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { return result + message + " (HTTP status code: " + httpStatusCode + ")"; } return result + message; diff --git a/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java b/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java new file mode 100644 index 000000000..03b57f77c --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java @@ -0,0 +1,126 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.exceptions; + +import io.grpc.Metadata; + +import java.util.concurrent.ExecutionException; + +/** + * Internal exception for propagating HTTP status code. + */ +public class DaprHttpException extends ExecutionException { + + /** + * This is the metadata from HTTP binding that we want to automatically parse and make it as part of the exception. + */ + private static final Metadata.Key GRPC_METADATA_KEY_HTTP_STATUS_CODE = + Metadata.Key.of("metadata.statuscode", Metadata.ASCII_STRING_MARSHALLER); + + private final int statusCode; + + private final byte[] payload; + + /** + * Instantiates a new DaprHttpException, without http body. + * @param statusCode HTTP status code. + * @param cause Exception thrown. + */ + private DaprHttpException(int statusCode, Throwable cause) { + super(cause); + this.statusCode = statusCode; + this.payload = null; + } + + /** + * Instantiates a new DaprHttpException with a given HTTP payload. + * @param statusCode HTTP status code. + * @param payload HTTP payload. + */ + public DaprHttpException(int statusCode, byte[] payload) { + super(); + this.statusCode = statusCode; + this.payload = payload; + } + + /** + * Creates an ExecutionException (can also be HttpException, if applicable). + * @param grpcMetadata Optional gRPC metadata. + * @param cause Exception triggered during execution. + * @return ExecutionException + */ + public static ExecutionException fromGrpcExecutionException(Metadata grpcMetadata, Throwable cause) { + int httpStatusCode = parseHttpStatusCode(grpcMetadata); + if (!isValidHttpStatusCode(httpStatusCode)) { + return new ExecutionException(cause); + } + + return new DaprHttpException(httpStatusCode, cause); + } + + public static boolean isValidHttpStatusCode(int statusCode) { + return statusCode >= 100 && statusCode <= 599; // Status codes range from 100 to 599 + } + + public static boolean isSuccessfulHttpStatusCode(int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + private static int parseHttpStatusCode(Metadata grpcMetadata) { + if (grpcMetadata == null) { + return 0; + } + + return parseHttpStatusCode(grpcMetadata.get(GRPC_METADATA_KEY_HTTP_STATUS_CODE)); + } + + /** + * Parses a given string value into an HTTP status code, 0 if invalid. + * @param value String value to be parsed. + * @return HTTP status code, 0 if not valid. + */ + public static int parseHttpStatusCode(String value) { + if ((value == null) || value.isEmpty()) { + return 0; + } + + try { + int httpStatusCode = Integer.parseInt(value); + if (!isValidHttpStatusCode(httpStatusCode)) { + return 0; + } + + return httpStatusCode; + } catch (NumberFormatException nfe) { + return 0; + } + } + + /** + * Returns the HTTP Status code for the exception. + * @return HTTP Status code for the exception, 0 if not applicable. + */ + public int getStatusCode() { + return this.statusCode; + } + + /** + * Returns the HTTP payload for the exception. + * @return HTTP payload, null if not present. + */ + public byte[] getPayload() { + return this.payload; + } + +} diff --git a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java index 9a47d1aeb..5241e1d5f 100644 --- a/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java +++ b/sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java @@ -15,12 +15,16 @@ import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor; import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor; +import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor; import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor; import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor; import io.dapr.internal.resiliency.TimeoutPolicy; +import io.grpc.Metadata; import io.grpc.stub.AbstractStub; import reactor.util.context.ContextView; +import java.util.function.Consumer; + /** * Class to be used as part of your service's client stub interceptor. * Usage: myClientStub = DaprClientGrpcInterceptors.intercept(myClientStub); @@ -35,7 +39,7 @@ public class DaprClientGrpcInterceptors { * @return async client instance with interceptors */ public static > T intercept(final String appId, final T client) { - return intercept(appId, client, null, null); + return intercept(appId, client, null, null, null); } /** @@ -45,7 +49,7 @@ public static > T intercept(final String appId, final * @return async client instance with interceptors */ public static > T intercept(final T client) { - return intercept(null, client, null, null); + return intercept(null, client, null, null, null); } /** @@ -58,7 +62,7 @@ public static > T intercept(final T client) { */ public static > T intercept( final String appId, final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(appId, client, timeoutPolicy, null); + return intercept(appId, client, timeoutPolicy, null, null); } /** @@ -69,7 +73,7 @@ public static > T intercept( * @return async client instance with interceptors */ public static > T intercept(final T client, final TimeoutPolicy timeoutPolicy) { - return intercept(null, client, timeoutPolicy, null); + return intercept(null, client, timeoutPolicy, null, null); } /** @@ -82,7 +86,7 @@ public static > T intercept(final T client, final Time */ public static > T intercept( final String appId, final T client, final ContextView context) { - return intercept(appId, client, null, context); + return intercept(appId, client, null, context, null); } /** @@ -93,7 +97,7 @@ public static > T intercept( * @return async client instance with interceptors */ public static > T intercept(final T client, final ContextView context) { - return intercept(null, client, null, context); + return intercept(null, client, null, context, null); } /** @@ -108,7 +112,24 @@ public static > T intercept( final T client, final TimeoutPolicy timeoutPolicy, final ContextView context) { - return intercept(null, client, timeoutPolicy, context); + return intercept(null, client, timeoutPolicy, context, null); + } + + /** + * Adds all Dapr interceptors to a gRPC async stub. + * @param client gRPC client + * @param timeoutPolicy timeout policy for gRPC call + * @param context Reactor context for tracing + * @param metadataConsumer Consumer of the gRPC metadata + * @param async client type + * @return async client instance with interceptors + */ + public static > T intercept( + final T client, + final TimeoutPolicy timeoutPolicy, + final ContextView context, + final Consumer metadataConsumer) { + return intercept(null, client, timeoutPolicy, context, metadataConsumer); } /** @@ -117,6 +138,7 @@ public static > T intercept( * @param client gRPC client * @param timeoutPolicy timeout policy for gRPC call * @param context Reactor context for tracing + * @param metadataConsumer Consumer of the gRPC metadata * @param async client type * @return async client instance with interceptors */ @@ -124,7 +146,8 @@ public static > T intercept( final String appId, final T client, final TimeoutPolicy timeoutPolicy, - final ContextView context) { + final ContextView context, + final Consumer metadataConsumer) { if (client == null) { throw new IllegalArgumentException("client cannot be null"); } @@ -133,7 +156,8 @@ public static > T intercept( new DaprAppIdInterceptor(appId), new DaprApiTokenInterceptor(), new DaprTimeoutInterceptor(timeoutPolicy), - new DaprTracingInterceptor(context)); + new DaprTracingInterceptor(context), + new DaprMetadataInterceptor(metadataConsumer)); } } diff --git a/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java new file mode 100644 index 000000000..44b438a01 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.grpc.interceptors; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; + +import java.util.function.Consumer; + +/** + * Consumes gRPC metadata. + */ +public class DaprMetadataInterceptor implements ClientInterceptor { + + private final Consumer metadataConsumer; + + /** + * Creates an instance of the consumer for gRPC metadata. + * @param metadataConsumer gRPC metadata consumer + */ + public DaprMetadataInterceptor(Consumer metadataConsumer) { + this.metadataConsumer = metadataConsumer; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) { + @Override + public void start(final Listener responseListener, final Metadata metadata) { + final ClientCall.Listener headerListener = + new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) { + @Override + public void onHeaders(Metadata headers) { + responseListener.onHeaders(headers); + if (metadataConsumer != null) { + metadataConsumer.accept(headers); + } + } + }; + super.start(headerListener, metadata); + } + }; + } + + + +}