Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom non-2xx response handling - DaprErrorResponseParser - Issue 783 #842

Closed
wants to merge 13 commits into from
Closed
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.10</version>
<executions>
<execution>
<goals>
Expand Down
2 changes: 1 addition & 1 deletion sdk-actors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.10</version>
<executions>
<execution>
<id>default-prepare-agent</id>
Expand Down
2 changes: 1 addition & 1 deletion sdk-springboot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.10</version>
<executions>
<execution>
<id>default-prepare-agent</id>
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<version>0.8.10</version>
<executions>
<execution>
<id>default-prepare-agent</id>
Expand Down
39 changes: 33 additions & 6 deletions sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.client;

import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
Expand Down Expand Up @@ -53,6 +54,11 @@ public class DaprClientBuilder {
*/
private DaprObjectSerializer objectSerializer;

/**
* Response parser used for custom handling of error responses from Dapr.
*/
private DaprErrorResponseParser errorParser;

/**
* Serializer used for state objects in DaprClient.
*/
Expand All @@ -61,7 +67,7 @@ public class DaprClientBuilder {
/**
* Creates a constructor for DaprClient.
*
* {@link DefaultObjectSerializer} is used for object and state serializers by defaul but is not recommended
* <p>{@link DefaultObjectSerializer} is used for object and state serializers by defaul but is not recommended
* for production scenarios.
*/
public DaprClientBuilder() {
Expand Down Expand Up @@ -92,6 +98,22 @@ public DaprClientBuilder withObjectSerializer(DaprObjectSerializer objectSeriali
return this;
}

/**
* Sets the error parser for objects to received from Dapr.
* See {@link DefaultDaprHttpErrorResponseParser} as a default parser for {@link DaprError}.
*
* @param errorParser Parser for objects received from Dapr.
* @return This instance.
*/
public DaprClientBuilder withErrorParser(DaprErrorResponseParser errorParser) {
if (errorParser == null) {
throw new IllegalArgumentException("Response parser is required");
}

this.errorParser = errorParser;
return this;
}

/**
* Sets the serializer for objects to be persisted.
* See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
Expand Down Expand Up @@ -149,9 +171,12 @@ private DaprClient buildDaprClient(DaprApiProtocol protocol) {
}

switch (protocol) {
case GRPC: return buildDaprClientGrpc();
case HTTP: return buildDaprClientHttp();
default: throw new IllegalStateException("Unsupported protocol: " + protocol.name());
case GRPC:
return buildDaprClientGrpc();
case HTTP:
return buildDaprClientHttp();
default:
throw new IllegalStateException("Unsupported protocol: " + protocol.name());
}
}

Expand All @@ -174,7 +199,9 @@ private DaprClient buildDaprClientGrpc() {
}
};
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer);

return new DaprClientGrpc(
closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer, this.errorParser);
}

/**
Expand All @@ -183,7 +210,7 @@ private DaprClient buildDaprClientGrpc() {
* @return DaprClient over HTTP.
*/
private DaprClient buildDaprClientHttp() {
return new DaprClientHttp(this.daprHttpBuilder.build(), this.objectSerializer, this.stateSerializer);
return new DaprClientHttp(this.daprHttpBuilder.build(errorParser), this.objectSerializer, this.stateSerializer);
}

}
73 changes: 65 additions & 8 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand All @@ -71,6 +72,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -81,6 +83,8 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.dapr.config.Properties.STRING_CHARSET;

/**
* An adapter for the GRPC Client.
*
Expand All @@ -89,6 +93,11 @@
*/
public class DaprClientGrpc extends AbstractDaprClient {

/**
* Default error parser for gRPC.
*/
private static final DaprErrorResponseParser DEFAULT_ERROR_PARSER = new DefaultDaprGrpcErrorResponseParser();

/**
* The GRPC managed channel to be used.
*/
Expand All @@ -99,6 +108,11 @@ public class DaprClientGrpc extends AbstractDaprClient {
*/
private DaprGrpc.DaprStub asyncStub;

/**
* Error parser.
*/
private DaprErrorResponseParser errorResponseParser;

/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
Expand All @@ -112,10 +126,13 @@ public class DaprClientGrpc extends AbstractDaprClient {
Closeable closeableChannel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
DaprObjectSerializer stateSerializer,
DaprErrorResponseParser errorResponseParser
) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.asyncStub = intercept(asyncStub);
this.errorResponseParser = errorResponseParser == null ? DEFAULT_ERROR_PARSER : errorResponseParser;
}

private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
Expand Down Expand Up @@ -299,15 +316,16 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
context -> this.<CommonProtos.InvokeResponse>createMonoWithErrorHandling(
it -> intercept(context, asyncStub).invokeService(envelope, it),
errorResponseParser
)
).flatMap(
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
} catch (IOException e) {
return Mono.error(DaprException.propagate(e));
}
}
);
Expand All @@ -316,6 +334,45 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
}
}

private <T> Mono<T> createMonoWithErrorHandling(
Consumer<StreamObserver<T>> consumer,
DaprErrorResponseParser errorResponseParser) {
return Mono.create(sink -> {
StreamObserver<T> streamObserver = new StreamObserver<T>() {
@Override
public void onNext(T value) {
sink.success(value);
}

@Override
public void onError(Throwable t) {
if (t instanceof StatusRuntimeException) {
StatusRuntimeException statusException = (StatusRuntimeException) t;
int statusCode = statusException.getStatus().getCode().value();
byte[] errorDetails = statusException.getStatus().getDescription() != null
? statusException.getStatus().getDescription().getBytes(STRING_CHARSET.get())
: new byte[0];
try {
DaprException exception = errorResponseParser.parse(statusCode, errorDetails);
sink.error(new ExecutionException(exception));
} catch (IOException e) {
sink.error(new ExecutionException(new DaprException("Error parsing error response", e.toString())));
}
} else {
sink.error(DaprException.propagate(new ExecutionException(t)));
}
}

@Override
public void onCompleted() {
sink.success();
}
};

DaprException.wrap(() -> consumer.accept(streamObserver)).run();
});
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -900,7 +957,7 @@ private Mono<Map<String, ConfigurationItem>> getConfiguration(DaprProtos.GetConf
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItems().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
}
return Collections.unmodifiableMap(configMap);
}
Expand Down Expand Up @@ -939,7 +996,7 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
}
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
}
Expand Down
34 changes: 34 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprErrorResponseParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client;

import io.dapr.exceptions.DaprException;

import java.io.IOException;

/**
* Parses an error response from Dapr APIs.
*/
public interface DaprErrorResponseParser {

/**
* Parses an error code and throws an Exception.
*
* @param statusCode HTTP or gRPC status code.
* @param response response payload from Dapr API.
* @return Exception parsed from payload
* @throws IOException if cannot parse error.
*/
DaprException parse(int statusCode, byte[] response) throws IOException;
}
Loading