Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick: handle HTTP binding error #1130

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions sdk-tests/components/http_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error
4 changes: 2 additions & 2 deletions sdk-tests/deploy/local-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
- 2181:2181

kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 9092:9092
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Expand Down
68 changes: 57 additions & 11 deletions sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
Expand All @@ -26,22 +27,58 @@

import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Service for input and output binding example.
*/
public class BindingIT extends BaseIT {

private static final String BINDING_NAME = "sample123";

private static final String BINDING_OPERATION = "create";

public static class MyClass {
public MyClass() {
@Test
public void httpOutputBindingError() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}

public String message;
@Test
public void httpOutputBindingErrorIgnoredByComponent() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains("\"message\":\"Not Found\""));
assertTrue(new String(e.getPayload()).contains("\"documentation_url\":\"https://docs.github.com/rest\""));
}
}, 10000);
}
}

@Test
Expand All @@ -53,11 +90,13 @@ public void inputOutputBinding() throws Exception {
true,
60000);

var bidingName = "sample123";

try(DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();

try {
Thread.sleep(1000);
Expand All @@ -76,14 +115,14 @@ public void inputOutputBinding() throws Exception {

System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
Expand Down Expand Up @@ -115,4 +154,11 @@ public void inputOutputBinding() throws Exception {
}, 8000);
}
}

public static class MyClass {
public MyClass() {
}

public String message;
}
}
52 changes: 45 additions & 7 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
Expand All @@ -76,6 +77,7 @@
import io.dapr.v1.DaprProtos.RegisteredComponents;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
Expand All @@ -99,6 +101,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;

/**
* Implementation of the Dapr client combining gRPC and HTTP (when applicable).
*
Expand Down Expand Up @@ -486,12 +492,22 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

Metadata responseMetadata = new Metadata();
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));
}

try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
Expand Down Expand Up @@ -1201,17 +1217,39 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context);
}

/**
* Populates GRPC client with interceptors for telemetry.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Consumer of gRPC metadata.
* @return Client after adding interceptors.
*/
private DaprGrpc.DaprStub intercept(
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}

private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}

private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return this.createFlux(null, consumer);
}

private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1220,7 +1258,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand All @@ -1230,7 +1268,7 @@ public void onCompleted() {
};
}

private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1239,7 +1277,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand Down
27 changes: 23 additions & 4 deletions sdk/src/main/java/io/dapr/client/DaprHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.utils.Version;
import okhttp3.Call;
import okhttp3.Callback;
Expand Down Expand Up @@ -387,17 +388,19 @@ public void onFailure(Call call, IOException e) {

@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
if (!response.isSuccessful()) {
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
try {
byte[] payload = getBodyBytesOrEmptyArray(response);
DaprError error = parseDaprError(payload);

if (error != null) {
future.completeExceptionally(new DaprException(error, payload, response.code()));
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
return;
}

future.completeExceptionally(
new DaprException("UNKNOWN", "", payload, response.code()));
new DaprException("UNKNOWN", "", payload, httpStatusCode));
return;
} catch (DaprException e) {
future.completeExceptionally(e);
Expand All @@ -410,8 +413,24 @@ public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) t
response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond());
});
future.complete(new Response(result, mapHeaders, response.code()));
future.complete(new Response(result, mapHeaders, httpStatusCode));
}
}

private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
if ((headerValue == null) || headerValue.isEmpty()) {
return defaultStatusCode;
}

// Metadata used to override status code with code received from HTTP binding.
try {
int httpStatusCode = Integer.parseInt(headerValue);
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}
return defaultStatusCode;
} catch (NumberFormatException nfe) {
return defaultStatusCode;
}
}
}
Loading
Loading