Skip to content

Commit

Permalink
Handle HTTP binding error. (#1024) (#1130)
Browse files Browse the repository at this point in the history
Update binding http IT

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: salaboy <Salaboy@gmail.com>
  • Loading branch information
artursouza authored and salaboy committed Sep 27, 2024
1 parent 570a311 commit 711a124
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 44 deletions.
27 changes: 27 additions & 0 deletions sdk-tests/components/http_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error
4 changes: 2 additions & 2 deletions sdk-tests/deploy/local-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
- 2181:2181

kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 9092:9092
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Expand Down
68 changes: 57 additions & 11 deletions sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
Expand All @@ -26,22 +27,58 @@

import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Service for input and output binding example.
*/
public class BindingIT extends BaseIT {

private static final String BINDING_NAME = "sample123";

private static final String BINDING_OPERATION = "create";

public static class MyClass {
public MyClass() {
@Test
public void httpOutputBindingError() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}

public String message;
@Test
public void httpOutputBindingErrorIgnoredByComponent() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains("\"message\":\"Not Found\""));
assertTrue(new String(e.getPayload()).contains("\"documentation_url\":\"https://docs.github.com/rest\""));
}
}, 10000);
}
}

@Test
Expand All @@ -53,11 +90,13 @@ public void inputOutputBinding() throws Exception {
true,
60000);

var bidingName = "sample123";

try(DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();

try {
Thread.sleep(1000);
Expand All @@ -76,14 +115,14 @@ public void inputOutputBinding() throws Exception {

System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
Expand Down Expand Up @@ -115,4 +154,11 @@ public void inputOutputBinding() throws Exception {
}, 8000);
}
}

public static class MyClass {
public MyClass() {
}

public String message;
}
}
52 changes: 45 additions & 7 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
Expand All @@ -76,6 +77,7 @@
import io.dapr.v1.DaprProtos.RegisteredComponents;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
Expand All @@ -99,6 +101,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;

/**
* Implementation of the Dapr client combining gRPC and HTTP (when applicable).
*
Expand Down Expand Up @@ -486,12 +492,22 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

Metadata responseMetadata = new Metadata();
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));
}

try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
Expand Down Expand Up @@ -1201,17 +1217,39 @@ private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub clien
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context);
}

/**
* Populates GRPC client with interceptors for telemetry.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Consumer of gRPC metadata.
* @return Client after adding interceptors.
*/
private DaprGrpc.DaprStub intercept(
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}

private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}

private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return this.createFlux(null, consumer);
}

private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1220,7 +1258,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand All @@ -1230,7 +1268,7 @@ public void onCompleted() {
};
}

private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1239,7 +1277,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand Down
27 changes: 23 additions & 4 deletions sdk/src/main/java/io/dapr/client/DaprHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.utils.Version;
import okhttp3.Call;
import okhttp3.Callback;
Expand Down Expand Up @@ -387,17 +388,19 @@ public void onFailure(Call call, IOException e) {

@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
if (!response.isSuccessful()) {
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
try {
byte[] payload = getBodyBytesOrEmptyArray(response);
DaprError error = parseDaprError(payload);

if (error != null) {
future.completeExceptionally(new DaprException(error, payload, response.code()));
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
return;
}

future.completeExceptionally(
new DaprException("UNKNOWN", "", payload, response.code()));
new DaprException("UNKNOWN", "", payload, httpStatusCode));
return;
} catch (DaprException e) {
future.completeExceptionally(e);
Expand All @@ -410,8 +413,24 @@ public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) t
response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond());
});
future.complete(new Response(result, mapHeaders, response.code()));
future.complete(new Response(result, mapHeaders, httpStatusCode));
}
}

private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
if ((headerValue == null) || headerValue.isEmpty()) {
return defaultStatusCode;
}

// Metadata used to override status code with code received from HTTP binding.
try {
int httpStatusCode = Integer.parseInt(headerValue);
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}
return defaultStatusCode;
} catch (NumberFormatException nfe) {
return defaultStatusCode;
}
}
}
Loading

0 comments on commit 711a124

Please sign in to comment.