Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle HTTP binding error. #1024

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.2
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.5
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.DaprClientGrpc;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
return DaprClientGrpc.intercept(context, client, null);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
Expand Down
27 changes: 27 additions & 0 deletions sdk-tests/components/http_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error
82 changes: 71 additions & 11 deletions sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -17,9 +17,9 @@
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -28,28 +28,81 @@

import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Service for input and output binding example.
*/
public class BindingIT extends BaseIT {

private static final String BINDING_NAME = "sample123";
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingError(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

private static final String BINDING_OPERATION = "create";
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}

public static class MyClass {
public MyClass() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingErrorIgnoredByComponent(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

public String message;
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertEquals(
"{\"message\":\"Not Found\",\"documentation_url\":\"https://docs.github.com/rest\"}",
new String(e.getPayload()));
}
}, 10000);
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void inputOutputBinding(boolean useGrpc) throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final String bidingName = "sample123";
String serviceNameVariant = useGrpc ? "-grpc" : "-http";

DaprRun daprRun = startDaprApp(
Expand All @@ -69,7 +122,7 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();

try {
Thread.sleep(1000);
Expand All @@ -88,14 +141,14 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {

System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
Expand Down Expand Up @@ -127,4 +180,11 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {
}, 8000);
}
}

public static class MyClass {
public MyClass() {
}

public String message;
}
}
90 changes: 79 additions & 11 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
Expand All @@ -65,6 +65,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
Expand All @@ -81,10 +82,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
import static io.dapr.internal.opencensus.GrpcWrapper.appendTracingToMetadata;

/**
* An adapter for the GRPC Client.
*
Expand Down Expand Up @@ -351,6 +356,7 @@
*/
@Override
public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type) {
Metadata responseMetadata = new Metadata();
try {
final String name = request.getName();
final String operation = request.getOperation();
Expand All @@ -377,10 +383,19 @@

return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));

Check warning on line 396 in sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

View check run for this annotation

Codecov / codecov/patch

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java#L396

Added line #L396 was not covered by tests
}

try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
Expand Down Expand Up @@ -1155,21 +1170,74 @@
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
private static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client) {
return intercept(context, client, null);
}

/**
* Populates GRPC client with interceptors for telemetry - internal use only.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Handles metadata result.
* @return Client after adding interceptors.
*/
public static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client,
Consumer<Metadata> metadataConsumer) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, options);
return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
appendTracingToMetadata(context, metadata);

final ClientCall.Listener<RespT> headerListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
if (metadataConsumer != null) {
metadataConsumer.accept(headers);

Check warning on line 1209 in sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

View check run for this annotation

Codecov / codecov/patch

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java#L1209

Added line #L1209 was not covered by tests
}
}
};
super.start(headerListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}

private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}

private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return this.createFlux(null, consumer);
}

private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1178,7 +1246,7 @@

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand All @@ -1188,7 +1256,7 @@
};
}

private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1197,7 +1265,7 @@

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand Down
Loading
Loading