From 69d7c4c7f32777051fcf2591de5ff942466f14dd Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Thu, 29 Feb 2024 22:05:41 -0800 Subject: [PATCH] Handle HTTP binding error. Signed-off-by: Artur Souza --- .github/workflows/build.yml | 2 +- .github/workflows/validate.yml | 2 +- .../io/dapr/actors/client/DaprGrpcClient.java | 3 +- sdk-tests/components/http_binding.yaml | 27 ++++ .../io/dapr/it/binding/http/BindingIT.java | 82 ++++++++++-- .../java/io/dapr/client/DaprClientGrpc.java | 90 +++++++++++-- .../main/java/io/dapr/client/DaprHttp.java | 27 +++- .../io/dapr/exceptions/DaprException.java | 63 +++++++-- .../exceptions/DaprHttpException.java | 126 ++++++++++++++++++ .../dapr/internal/opencensus/GrpcWrapper.java | 81 ++++------- 10 files changed, 410 insertions(+), 93 deletions(-) create mode 100644 sdk-tests/components/http_binding.yaml create mode 100644 sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b3397cdf7..58b1fab0f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -44,7 +44,7 @@ jobs: GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} DAPR_CLI_VER: 1.13.0-rc.1 - DAPR_RUNTIME_VER: 1.13.0-rc.2 + DAPR_RUNTIME_VER: 1.13.0-rc.10 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh DAPR_CLI_REF: DAPR_REF: diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 5ddf904c0..dba1355a3 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -38,7 +38,7 @@ jobs: GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} DAPR_CLI_VER: 1.13.0-rc.1 - DAPR_RUNTIME_VER: 1.13.0-rc.5 + DAPR_RUNTIME_VER: 1.13.0-rc.10 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh DAPR_CLI_REF: DAPR_REF: diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index efe37b0be..00a9fdd13 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -14,6 +14,7 @@ package io.dapr.actors.client; import com.google.protobuf.ByteString; +import io.dapr.client.DaprClientGrpc; import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; @@ -128,7 +129,7 @@ public void start(final Listener responseListener, final Metadata metadat * @return Client after adding interceptors. */ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { - return GrpcWrapper.intercept(context, client); + return DaprClientGrpc.intercept(context, client, null); } private Mono createMono(Consumer> consumer) { diff --git a/sdk-tests/components/http_binding.yaml b/sdk-tests/components/http_binding.yaml new file mode 100644 index 000000000..5a3ca0a1d --- /dev/null +++ b/sdk-tests/components/http_binding.yaml @@ -0,0 +1,27 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: github-http-binding-404 +spec: + type: bindings.http + version: v1 + metadata: + - name: url + value: https://api.github.com/unknown_path +scopes: + - bindingit-httpoutputbinding-exception +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: github-http-binding-404-success +spec: + type: bindings.http + version: v1 + metadata: + - name: url + value: https://api.github.com/unknown_path + - name: errorIfNot2XX + value: "false" +scopes: + - bindingit-httpoutputbinding-ignore-error \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java index 8d66c26aa..50709cbff 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 The Dapr Authors + * Copyright 2024 The Dapr Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -17,9 +17,9 @@ import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -28,6 +28,7 @@ import static io.dapr.it.Retry.callWithRetry; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -35,21 +36,73 @@ */ public class BindingIT extends BaseIT { - private static final String BINDING_NAME = "sample123"; + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void httpOutputBindingError(boolean useGrpc) throws Exception { + DaprRun daprRun = startDaprApp( + this.getClass().getSimpleName() + "-httpoutputbinding-exception", + 60000); + // At this point, it is guaranteed that the service above is running and all ports being listened to. + if (useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } - private static final String BINDING_OPERATION = "create"; + try(DaprClient client = new DaprClientBuilder().build()) { + // Validate error message + callWithRetry(() -> { + System.out.println("Checking exception handling for output binding ..."); + try { + client.invokeBinding("github-http-binding-404", "get", "").block(); + fail("Should throw an exception"); + } catch (DaprException e) { + assertEquals(404, e.getHttpStatusCode()); + // This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not + // consistent between HTTP and gRPC. + assertTrue(new String(e.getPayload()).contains( + "error invoking output binding github-http-binding-404: received status code 404")); + } + }, 10000); + } + } - public static class MyClass { - public MyClass() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void httpOutputBindingErrorIgnoredByComponent(boolean useGrpc) throws Exception { + DaprRun daprRun = startDaprApp( + this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error", + 60000); + // At this point, it is guaranteed that the service above is running and all ports being listened to. + if (useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); } - public String message; + try(DaprClient client = new DaprClientBuilder().build()) { + // Validate error message + callWithRetry(() -> { + System.out.println("Checking exception handling for output binding ..."); + try { + client.invokeBinding("github-http-binding-404-success", "get", "").block(); + fail("Should throw an exception"); + } catch (DaprException e) { + assertEquals(404, e.getHttpStatusCode()); + // The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be + // consistent between HTTP and gRPC. + assertEquals( + "{\"message\":\"Not Found\",\"documentation_url\":\"https://docs.github.com/rest\"}", + new String(e.getPayload())); + } + }, 10000); + } } @ParameterizedTest @ValueSource(booleans = {true, false}) public void inputOutputBinding(boolean useGrpc) throws Exception { - System.out.println("Working Directory = " + System.getProperty("user.dir")); + final String bidingName = "sample123"; String serviceNameVariant = useGrpc ? "-grpc" : "-http"; DaprRun daprRun = startDaprApp( @@ -69,7 +122,7 @@ public void inputOutputBinding(boolean useGrpc) throws Exception { callWithRetry(() -> { System.out.println("Checking if input binding is up before publishing events ..."); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, "ping").block(); + bidingName, "create", "ping").block(); try { Thread.sleep(1000); @@ -88,14 +141,14 @@ public void inputOutputBinding(boolean useGrpc) throws Exception { System.out.println("sending first message"); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); + bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); // This is an example of sending a plain string. The input binding will receive // cat final String m = "cat"; System.out.println("sending " + m); client.invokeBinding( - BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); + bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block(); // Metadata is not used by Kafka component, so it is not possible to validate. callWithRetry(() -> { @@ -127,4 +180,11 @@ public void inputOutputBinding(boolean useGrpc) throws Exception { }, 8000); } } + + public static class MyClass { + public MyClass() { + } + + public String message; + } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index eb9c045fc..8b7b36df8 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -50,7 +50,7 @@ import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; -import io.dapr.internal.opencensus.GrpcWrapper; +import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.internal.resiliency.RetryPolicy; import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.serializer.DaprObjectSerializer; @@ -65,6 +65,7 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.stub.StreamObserver; @@ -81,10 +82,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; +import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode; +import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode; +import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode; +import static io.dapr.internal.opencensus.GrpcWrapper.appendTracingToMetadata; + /** * An adapter for the GRPC Client. * @@ -351,6 +356,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef */ @Override public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) { + Metadata responseMetadata = new Metadata(); try { final String name = request.getName(); final String operation = request.getOperation(); @@ -377,10 +383,19 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) return Mono.deferContextual( context -> this.createMono( - it -> intercept(context, asyncStub).invokeBinding(envelope, it) + responseMetadata, + it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it) ) ).flatMap( it -> { + int httpStatusCode = + parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", "")); + if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) { + // Exception condition in a successful request. + // This is useful to send an exception due to an error from the HTTP binding component. + throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray())); + } + try { return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); } catch (IOException e) { @@ -1155,21 +1170,74 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { - return GrpcWrapper.intercept(context, client); + private static DaprGrpc.DaprStub intercept( + ContextView context, + DaprGrpc.DaprStub client) { + return intercept(context, client, null); + } + + /** + * Populates GRPC client with interceptors for telemetry - internal use only. + * + * @param context Reactor's context. + * @param client GRPC client for Dapr. + * @param metadataConsumer Handles metadata result. + * @return Client after adding interceptors. + */ + public static DaprGrpc.DaprStub intercept( + ContextView context, + DaprGrpc.DaprStub client, + Consumer metadataConsumer) { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions options, + Channel channel) { + ClientCall clientCall = channel.newCall(methodDescriptor, options); + return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) { + @Override + public void start(final Listener responseListener, final Metadata metadata) { + appendTracingToMetadata(context, metadata); + + final ClientCall.Listener headerListener = + new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) { + @Override + public void onHeaders(Metadata headers) { + responseListener.onHeaders(headers); + if (metadataConsumer != null) { + metadataConsumer.accept(headers); + } + } + }; + super.start(headerListener, metadata); + } + }; + } + }; + return client.withInterceptors(interceptor); } private Mono createMono(Consumer> consumer) { + return this.createMono(null, consumer); + } + + private Mono createMono(Metadata metadata, Consumer> consumer) { return retryPolicy.apply( - Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); + Mono.create(sink -> DaprException.wrap(() -> consumer.accept( + createStreamObserver(sink, metadata))).run())); } private Flux createFlux(Consumer> consumer) { + return this.createFlux(null, consumer); + } + + private Flux createFlux(Metadata metadata, Consumer> consumer) { return retryPolicy.apply( - Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); + Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run())); } - private StreamObserver createStreamObserver(MonoSink sink) { + private StreamObserver createStreamObserver(MonoSink sink, Metadata grpcMetadata) { return new StreamObserver() { @Override public void onNext(T value) { @@ -1178,7 +1246,7 @@ public void onNext(T value) { @Override public void onError(Throwable t) { - sink.error(DaprException.propagate(new ExecutionException(t))); + sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t))); } @Override @@ -1188,7 +1256,7 @@ public void onCompleted() { }; } - private StreamObserver createStreamObserver(FluxSink sink) { + private StreamObserver createStreamObserver(FluxSink sink, final Metadata grpcMetadata) { return new StreamObserver() { @Override public void onNext(T value) { @@ -1197,7 +1265,7 @@ public void onNext(T value) { @Override public void onError(Throwable t) { - sink.error(DaprException.propagate(new ExecutionException(t))); + sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t))); } @Override diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 05da6efd5..0d979b70d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -18,6 +18,7 @@ import io.dapr.config.Properties; import io.dapr.exceptions.DaprError; import io.dapr.exceptions.DaprException; +import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.utils.Version; import okhttp3.Call; import okhttp3.Callback; @@ -381,17 +382,19 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { - if (!response.isSuccessful()) { + int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code()); + if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) { try { byte[] payload = getBodyBytesOrEmptyArray(response); DaprError error = parseDaprError(payload); + if (error != null) { - future.completeExceptionally(new DaprException(error, payload, response.code())); + future.completeExceptionally(new DaprException(error, payload, httpStatusCode)); return; } future.completeExceptionally( - new DaprException("UNKNOWN", "", payload, response.code())); + new DaprException("UNKNOWN", "", payload, httpStatusCode)); return; } catch (DaprException e) { future.completeExceptionally(e); @@ -404,8 +407,24 @@ public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) t response.headers().forEach(pair -> { mapHeaders.put(pair.getFirst(), pair.getSecond()); }); - future.complete(new Response(result, mapHeaders, response.code())); + future.complete(new Response(result, mapHeaders, httpStatusCode)); } } + private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) { + if ((headerValue == null) || headerValue.isEmpty()) { + return defaultStatusCode; + } + + // Metadata used to override status code with code received from HTTP binding. + try { + int httpStatusCode = Integer.parseInt(headerValue); + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { + return httpStatusCode; + } + return defaultStatusCode; + } catch (NumberFormatException nfe) { + return defaultStatusCode; + } + } } diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprException.java b/sdk/src/main/java/io/dapr/exceptions/DaprException.java index 0801069e8..6d1ccc799 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprException.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprException.java @@ -14,6 +14,8 @@ package io.dapr.exceptions; import com.google.rpc.Status; +import io.dapr.config.Properties; +import io.dapr.internal.exceptions.DaprHttpException; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import reactor.core.Exceptions; @@ -170,11 +172,33 @@ public DaprException(String errorCode, String message, Throwable cause) { */ public DaprException( String errorCode, String message, Throwable cause, DaprErrorDetails errorDetails, byte[] payload) { - super(buildErrorMessage(errorCode, 0, message), cause); - this.httpStatusCode = 0; + this(errorCode, message, cause, errorDetails, payload, 0); + } + + /** + * New exception from a server-side generated error code and message. + * @param errorCode Client-side error code. + * @param message Client-side error message. + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A {@code null} value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + * @param errorDetails the status details for the error. + * @param payload Raw error payload. + * @param httpStatusCode Optional HTTP Status code for the error, 0 if not applicable. + */ + private DaprException( + String errorCode, + String message, + Throwable cause, + DaprErrorDetails errorDetails, + byte[] payload, + int httpStatusCode) { + super(buildErrorMessage(errorCode, httpStatusCode, message), cause); this.errorCode = errorCode; this.errorDetails = errorDetails == null ? DaprErrorDetails.EMPTY_INSTANCE : errorDetails; this.payload = payload; + this.httpStatusCode = httpStatusCode; } /** @@ -305,21 +329,27 @@ public static RuntimeException propagate(Throwable exception) { return (DaprException) exception; } + int httpStatusCode = 0; + byte[] httpPayload = null; Throwable e = exception; while (e != null) { - if (e instanceof StatusRuntimeException) { + if (e instanceof DaprHttpException) { + DaprHttpException daprHttpException = (DaprHttpException) e; + httpStatusCode = daprHttpException.getStatusCode(); + httpPayload = daprHttpException.getPayload(); + } else if (e instanceof StatusRuntimeException) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e; Status status = StatusProto.fromThrowable(statusRuntimeException); DaprErrorDetails errorDetails = new DaprErrorDetails(status); return new DaprException( - statusRuntimeException.getStatus().getCode().toString(), - statusRuntimeException.getStatus().getDescription(), - exception, - errorDetails, - status.toByteArray()); - + statusRuntimeException.getStatus().getCode().toString(), + statusRuntimeException.getStatus().getDescription(), + exception, + errorDetails, + httpPayload != null ? httpPayload : status.getMessage().getBytes(Properties.STRING_CHARSET.get()), + httpStatusCode); } e = e.getCause(); @@ -329,19 +359,30 @@ public static RuntimeException propagate(Throwable exception) { return (IllegalArgumentException) exception; } + if (exception instanceof DaprHttpException) { + DaprHttpException daprHttpException = (DaprHttpException)exception; + return new DaprException( + io.grpc.Status.UNKNOWN.toString(), + null, + exception, + null, + daprHttpException.getPayload(), + daprHttpException.getStatusCode()); + } + return new DaprException(exception); } private static String buildErrorMessage(String errorCode, int httpStatusCode, String message) { String result = ((errorCode == null) || errorCode.isEmpty()) ? "UNKNOWN: " : errorCode + ": "; if ((message == null) || message.isEmpty()) { - if (httpStatusCode > 0) { + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { return result + "HTTP status code: " + httpStatusCode; } return result; } - if (httpStatusCode > 0) { + if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { return result + message + " (HTTP status code: " + httpStatusCode + ")"; } return result + message; diff --git a/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java b/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java new file mode 100644 index 000000000..03b57f77c --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/exceptions/DaprHttpException.java @@ -0,0 +1,126 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.exceptions; + +import io.grpc.Metadata; + +import java.util.concurrent.ExecutionException; + +/** + * Internal exception for propagating HTTP status code. + */ +public class DaprHttpException extends ExecutionException { + + /** + * This is the metadata from HTTP binding that we want to automatically parse and make it as part of the exception. + */ + private static final Metadata.Key GRPC_METADATA_KEY_HTTP_STATUS_CODE = + Metadata.Key.of("metadata.statuscode", Metadata.ASCII_STRING_MARSHALLER); + + private final int statusCode; + + private final byte[] payload; + + /** + * Instantiates a new DaprHttpException, without http body. + * @param statusCode HTTP status code. + * @param cause Exception thrown. + */ + private DaprHttpException(int statusCode, Throwable cause) { + super(cause); + this.statusCode = statusCode; + this.payload = null; + } + + /** + * Instantiates a new DaprHttpException with a given HTTP payload. + * @param statusCode HTTP status code. + * @param payload HTTP payload. + */ + public DaprHttpException(int statusCode, byte[] payload) { + super(); + this.statusCode = statusCode; + this.payload = payload; + } + + /** + * Creates an ExecutionException (can also be HttpException, if applicable). + * @param grpcMetadata Optional gRPC metadata. + * @param cause Exception triggered during execution. + * @return ExecutionException + */ + public static ExecutionException fromGrpcExecutionException(Metadata grpcMetadata, Throwable cause) { + int httpStatusCode = parseHttpStatusCode(grpcMetadata); + if (!isValidHttpStatusCode(httpStatusCode)) { + return new ExecutionException(cause); + } + + return new DaprHttpException(httpStatusCode, cause); + } + + public static boolean isValidHttpStatusCode(int statusCode) { + return statusCode >= 100 && statusCode <= 599; // Status codes range from 100 to 599 + } + + public static boolean isSuccessfulHttpStatusCode(int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + private static int parseHttpStatusCode(Metadata grpcMetadata) { + if (grpcMetadata == null) { + return 0; + } + + return parseHttpStatusCode(grpcMetadata.get(GRPC_METADATA_KEY_HTTP_STATUS_CODE)); + } + + /** + * Parses a given string value into an HTTP status code, 0 if invalid. + * @param value String value to be parsed. + * @return HTTP status code, 0 if not valid. + */ + public static int parseHttpStatusCode(String value) { + if ((value == null) || value.isEmpty()) { + return 0; + } + + try { + int httpStatusCode = Integer.parseInt(value); + if (!isValidHttpStatusCode(httpStatusCode)) { + return 0; + } + + return httpStatusCode; + } catch (NumberFormatException nfe) { + return 0; + } + } + + /** + * Returns the HTTP Status code for the exception. + * @return HTTP Status code for the exception, 0 if not applicable. + */ + public int getStatusCode() { + return this.statusCode; + } + + /** + * Returns the HTTP payload for the exception. + * @return HTTP payload, null if not present. + */ + public byte[] getPayload() { + return this.payload; + } + +} diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java index 61db83627..f25be1f96 100644 --- a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java +++ b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java @@ -14,14 +14,7 @@ package io.dapr.internal.opencensus; import io.dapr.config.Property; -import io.dapr.v1.DaprGrpc; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; import io.grpc.Metadata; -import io.grpc.MethodDescriptor; import reactor.util.context.Context; import reactor.util.context.ContextView; @@ -55,55 +48,37 @@ private GrpcWrapper() { } /** - * Populates GRPC client with interceptors. + * Populates metadata with tracing headers. * - * @param context Reactor's context. - * @param client GRPC client for Dapr. - * @return Client after adding interceptors. + * @param context Context containing tracing information. + * @param metadata Metadata where tracing values will be added to. */ - public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) { - ClientInterceptor interceptor = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, - Channel channel) { - ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); - return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { - @Override - public void start(final Listener responseListener, final Metadata metadata) { - Map map = (context == null ? Context.empty() : context) - .stream() - .filter(e -> (e.getKey() != null) && (e.getValue() != null)) - .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue())); - if (map.containsKey(GRPC_TRACE_BIN_KEY.name())) { - byte[] value = (byte[]) map.get(GRPC_TRACE_BIN_KEY.name()); - metadata.put(GRPC_TRACE_BIN_KEY, value); - } - if (map.containsKey(TRACEPARENT_KEY.name())) { - String value = map.get(TRACEPARENT_KEY.name()).toString(); - metadata.put(TRACEPARENT_KEY, value); - } - if (map.containsKey(TRACESTATE_KEY.name())) { - String value = map.get(TRACESTATE_KEY.name()).toString(); - metadata.put(TRACESTATE_KEY, value); - } - - // Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet: - // https://github.com/open-telemetry/opentelemetry-specification/issues/639 - // This should be the only use of OpenCensus SDK: populate "grpc-trace-bin". - SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata); - if (opencensusSpanContext != null) { - byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext); - metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin); - } + public static void appendTracingToMetadata(ContextView context, Metadata metadata) { + Map map = (context == null ? Context.empty() : context) + .stream() + .filter(e -> (e.getKey() != null) && (e.getValue() != null)) + .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue())); + if (map.containsKey(GRPC_TRACE_BIN_KEY.name())) { + byte[] value = (byte[]) map.get(GRPC_TRACE_BIN_KEY.name()); + metadata.put(GRPC_TRACE_BIN_KEY, value); + } + if (map.containsKey(TRACEPARENT_KEY.name())) { + String value = map.get(TRACEPARENT_KEY.name()).toString(); + metadata.put(TRACEPARENT_KEY, value); + } + if (map.containsKey(TRACESTATE_KEY.name())) { + String value = map.get(TRACESTATE_KEY.name()).toString(); + metadata.put(TRACESTATE_KEY, value); + } - super.start(responseListener, metadata); - } - }; - } - }; - return client.withInterceptors(interceptor); + // Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet: + // https://github.com/open-telemetry/opentelemetry-specification/issues/639 + // This should be the only use of OpenCensus SDK: populate "grpc-trace-bin". + SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata); + if (opencensusSpanContext != null) { + byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext); + metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin); + } } private static SpanContext extractOpenCensusSpanContext(Metadata metadata) {