Skip to content

Commit

Permalink
Throw from onNext when unable to send.
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Bair <rbair23@users.noreply.github.com>
  • Loading branch information
rbair23 committed Nov 22, 2024
1 parent 170e5a1 commit 69a20cd
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pbj-core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Version number
version=0.9.5-SNAPSHOT
version=0.9.11-SNAPSHOT

# Need increased heap for running Gradle itself, or SonarQube will run the JVM out of metaspace
org.gradle.jvmargs=-Xmx2048m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ final class PbjMethodRoute extends PbjRoute {
@NonNull private final Counter failedGrpcRequestCounter;
@NonNull private final Counter failedHttpRequestCounter;
@NonNull private final Counter failedUnknownRequestCounter;
@NonNull private final Counter failedResponseCounter;
@NonNull private final Counter deadlineExceededCounter;

/**
Expand Down Expand Up @@ -99,6 +100,14 @@ final class PbjMethodRoute extends PbjRoute {
.addTag(Tag.create(METHOD_TAG, methodName))
.addTag(Tag.create(FAILURE_TAG, "unknown-exception"))
.description("The number of failed unknown requests"));
this.failedResponseCounter =
metricRegistry.getOrCreate(
Counter.builder("pbj.grpc.failed.responses")
.scope(SCOPE)
.addTag(Tag.create(SERVICE_TAG, serviceName))
.addTag(Tag.create(METHOD_TAG, methodName))
.addTag(Tag.create(FAILURE_TAG, "response"))
.description("The number of failed responses"));
this.deadlineExceededCounter =
metricRegistry.getOrCreate(
Counter.builder("pbj.grpc.deadline.exceeded")
Expand Down Expand Up @@ -160,6 +169,11 @@ public Counter failedUnknownRequestCounter() {
return failedUnknownRequestCounter;
}

@NonNull
public Counter failedResponseCounter() {
return failedResponseCounter;
}

@NonNull
public Counter deadlineExceededCounter() {
return deadlineExceededCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -389,16 +390,11 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
GrpcStatus.INVALID_ARGUMENT,
"Message size exceeds maximum allowed size");
}
// Create a buffer to hold the message. We sadly cannot reuse this
// buffer
// because once we have filled it and wrapped it in Bytes and sent
// it to the
// handler, some user code may grab and hold that Bytes object for
// an arbitrary
// amount of time, and if we were to scribble into the same byte
// array, we
// would break the application. So we need a new buffer each time
// :-(
// Create a buffer to hold the message. We sadly cannot reuse this buffer
// because once we have filled it and wrapped it in Bytes and sent it to the
// handler, some user code may grab and hold that Bytes object for an arbitrary
// amount of time, and if we were to scribble into the same byte array, we
// would break the application. So we need a new buffer each time :-(
entityBytes = new byte[(int) length];
entityBytesIndex = 0;
// done with length now, so move on to next state
Expand All @@ -408,14 +404,10 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
}
case READ_ENTITY_BYTES:
{
// By the time we get here, entityBytes is no longer null. It may be
// empty, or it
// may already have been partially populated from a previous iteration.
// It may be
// that the number of bytes available to be read is larger than just
// this one
// message. So we need to be careful to read, from what is available,
// only up to
// By the time we get here, entityBytes is no longer null. It may be empty, or it
// may already have been partially populated from a previous iteration. It may be
// that the number of bytes available to be read is larger than just this one
// message. So we need to be careful to read, from what is available, only up to
// the message length, and to leave the rest for the next iteration.
final int available = data.available();
final int numBytesToRead =
Expand Down Expand Up @@ -462,10 +454,8 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
* <p>May be called by different threads concurrently.
*/
private void error() {
// Canceling a future that has already completed has no effect. So by canceling here, we are
// saying:
// "If you have not yet executed, never execute. If you have already executed, then just
// ignore me".
// Canceling a future that has already completed has no effect. So by canceling here, we are saying:
// "If you have not yet executed, never execute. If you have already executed, then just ignore me".
// The "isCancelled" flag is set if the future was canceled before it was executed.

// cancel is threadsafe
Expand Down Expand Up @@ -548,8 +538,7 @@ private void sendResponseHeaders(

// Some headers are http2 specific, the rest are used for the grpc protocol
final var grpcHeaders = WritableHeaders.create();
// FUTURE: I think to support custom headers in the response, we would have to list them
// here.
// FUTURE: I think to support custom headers in the response, we would have to list them here.
// Since this has to be sent before we have any data to send, we must know ahead of time
// which custom headers are to be returned.
grpcHeaders.set(HeaderNames.TRAILER, "grpc-status, grpc-message");
Expand Down Expand Up @@ -701,23 +690,32 @@ public void onNext(@NonNull final Bytes response) {
Http2Flag.DataFlags.create(0),
streamId);

streamWriter.writeData(
new Http2FrameData(header, bufferData), flowControl);
// This method may throw an UncheckedIOException. If this happens, the connection with the client
// has been violently terminated, and we should raise the error, and we should throw an exception
// so the user knows the connection is toast.
streamWriter.writeData(new Http2FrameData(header, bufferData), flowControl);
} catch (final Exception e) {
LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e);
LOGGER.log(DEBUG, "Failed to respond to grpc request: " + route.method(), e);
route.failedResponseCounter().increment();
throw new RuntimeException(e);
}
}

@Override
public void onError(@NonNull final Throwable throwable) {
if (throwable instanceof final GrpcException grpcException) {
new TrailerBuilder()
.grpcStatus(grpcException.status())
.statusMessage(grpcException.getMessage())
.send();
} else {
LOGGER.log(ERROR, "Failed to send response", throwable);
new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send();
try {
if (throwable instanceof final GrpcException grpcException) {
new TrailerBuilder()
.grpcStatus(grpcException.status())
.statusMessage(grpcException.getMessage())
.send();
} else {
LOGGER.log(DEBUG, "Failed to send response", throwable);
new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send();
}
} catch (Exception ignored) {
// If an exception is thrown trying to return headers, we're already in the error state, so
// just continue.
}
error();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.ArrayList;
import java.util.concurrent.Flow;

final class GreeterServiceImpl implements GreeterService {
class GreeterServiceImpl implements GreeterService {
GrpcStatus errorToThrow = null;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
package com.hedera.pbj.grpc.helidon;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.api.Assumptions.assumeThat;

import com.hedera.pbj.grpc.helidon.config.PbjConfig;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import greeter.HelloReply;
import greeter.HelloRequest;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.uri.UriEncoding;
import io.helidon.http.Header;
import io.helidon.http.HeaderNames;
Expand All @@ -31,17 +37,25 @@
import io.helidon.http.http2.FlowControl;
import io.helidon.http.http2.Http2Flag;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2StreamState;
import io.helidon.http.http2.Http2StreamWriter;
import io.netty.handler.codec.http2.Http2Flags;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -61,7 +75,9 @@ static void beforeAll() {

@BeforeEach
void setUp() {
headers = Http2Headers.create(WritableHeaders.create());
final var h = WritableHeaders.create();
h.add(HeaderNames.CONTENT_TYPE, "application/grpc");
headers = Http2Headers.create(h);
streamWriter = new StreamWriterStub();
streamId = 1;
flowControl = new OutboundFlowControlStub();
Expand Down Expand Up @@ -119,6 +135,74 @@ void unsupportedContentType(String contentType) {
assertThat(handler.streamState()).isEqualTo(Http2StreamState.CLOSED);
}

@Test
void errorThrownForOnNextWhenStreamIsClosed() {
// Use a custom streamWriter that will throw an exception when "streamClosed" is set to true, and it is
// asked to write something. This can be used to simulate what happens when the network connection fails.
final var streamClosed = new AtomicBoolean(false);
streamWriter = new StreamWriterStub() {
@Override
public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) {
if (streamClosed.get()) {
throw new IllegalStateException("Stream is closed");
}
}
};

// Within this test, the replyRef will be set once when the setup is complete, and then
// will be available for the test code to use to call onNext, onError, etc. as required.
final var replyRef = new AtomicReference<Pipeline<? super HelloReply>>();
route = new PbjMethodRoute(new GreeterServiceImpl() {
@Override
public void sayHelloStreamReply(HelloRequest request, Pipeline<? super HelloReply> replies) {
replyRef.set(replies);
}
}, GreeterService.GreeterMethod.sayHelloStreamReply);

final var handler = new PbjProtocolHandler(headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector);
handler.init();
sendAllData(handler, Bytes.wrap(HelloRequest.newBuilder().setName("Alice").build().toByteArray()));

final var replies = replyRef.get();
assertThat(replies).isNotNull();

replies.onNext(HelloReply.newBuilder().setMessage("Good").build());
streamClosed.set(true);

final var failingReply = HelloReply.newBuilder().setMessage("Bad").build();
assertThatThrownBy(() -> replies.onNext(failingReply))
.isInstanceOf(Exception.class);

assertThat(route.requestCounter().count()).isEqualTo(1);
assertThat(route.failedGrpcRequestCounter().count()).isEqualTo(0);
assertThat(route.failedHttpRequestCounter().count()).isEqualTo(0);
assertThat(route.failedUnknownRequestCounter().count()).isEqualTo(0);
assertThat(route.failedResponseCounter().count()).isEqualTo(1);
}

private void sendAllData(PbjProtocolHandler handler, Bytes bytes) {
final var frameHeader = createDataFrameHeader((int) bytes.length());
final var buf = createDataFrameBytes(bytes);
handler.data(frameHeader, buf);
}

private BufferData createDataFrameBytes(Bytes data) {
try {
final var buf = new ByteArrayOutputStream((int) data.length() + 5);
final var s = new DataOutputStream(buf);
s.writeByte(0);
s.writeInt((int) data.length());
data.writeTo(s);
return BufferData.create(buf.toByteArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private Http2FrameHeader createDataFrameHeader(int length) {
return Http2FrameHeader.create(length + 5, Http2FrameTypes.DATA, Http2Flag.DataFlags.create(Http2Flags.END_STREAM), streamId);
}

private static final class OutboundFlowControlStub implements FlowControl.Outbound {

@Override
Expand Down Expand Up @@ -157,7 +241,7 @@ public int getRemainingWindowSize() {
}
}

private static final class StreamWriterStub implements Http2StreamWriter {
private static class StreamWriterStub implements Http2StreamWriter {
private final List<Http2FrameData> writtenDataFrames = new ArrayList<>();
private final List<Http2Headers> writtenHeaders = new ArrayList<>();

Expand Down Expand Up @@ -190,7 +274,7 @@ private static final class PbjConfigStub implements PbjConfig {

@Override
public int maxMessageSizeBytes() {
return 0;
return 100;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@ public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
default void clientEndStreamReceived() { };
default void clientEndStreamReceived() { }

/**
* {@inheritDoc}
* @throws RuntimeException if an error occurs while trying to write data to the pipeline
*/
@Override
default void onNext(T item) throws RuntimeException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,12 @@ public void onNext(@NonNull final Bytes message) {
final var replyBytes = responseMapper.apply(reply);
replies.onNext(replyBytes);
onComplete();
} catch (RuntimeException e) {
replies.onError(e);
throw e;
} catch (Exception e) {
replies.onError(e);
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -625,8 +629,12 @@ public void onNext(@NonNull final Bytes message) {
try {
final var request = requestMapper.apply(message);
incoming.onNext(request);
} catch (RuntimeException e) {
replies.onError(e);
throw e;
} catch (Exception e) {
replies.onError(e);
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -715,8 +723,12 @@ public void onNext(@NonNull final Bytes message) {
try {
final var request = requestMapper.apply(message);
incoming.onNext(request);
} catch (RuntimeException e) {
replies.onError(e);
throw e;
} catch (Exception e) {
replies.onError(e);
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -799,8 +811,12 @@ public void onNext(@NonNull final Bytes message) {
try {
final var request = requestMapper.apply(message);
method.apply(request, responseConverter);
} catch (RuntimeException e) {
replies.onError(e);
throw e;
} catch (Exception e) {
replies.onError(e);
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -847,8 +863,12 @@ public void onNext(T item) {
try {
final var r = mapper.apply(item);
next.onNext(r);
} catch (RuntimeException e) {
next.onError(e);
throw e;
} catch (Throwable t) {
next.onError(t);
throw new RuntimeException(t);
}
}

Expand Down

0 comments on commit 69a20cd

Please sign in to comment.