From db5b7585faeb6170b4231e358005f9b3d56c36fc Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Wed, 24 Apr 2024 14:55:46 -0400 Subject: [PATCH] Some new tests from JK and fixes to gRPC client code to handle: (1) exceptions thrown in gRPC methods and (2) larger payloads. --- .../webclient/grpc/GrpcClientCall.java | 18 ++++++++-- .../webclient/grpc/GrpcUnaryClientCall.java | 11 +++--- .../tests/grpc/src/main/proto/strings.proto | 2 ++ .../webclient/grpc/tests/GrpcBaseTest.java | 11 +++++- .../webclient/grpc/tests/GrpcStubTest.java | 34 +++++++++++++++++++ .../src/test/resources/logging.properties | 2 +- 6 files changed, 70 insertions(+), 8 deletions(-) diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java index 0234c26f4e6..13549c464cd 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java @@ -16,6 +16,10 @@ package io.helidon.webclient.grpc; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -88,8 +92,18 @@ public void halfClose() { @Override public void sendMessage(ReqT message) { socket().log(LOGGER, DEBUG, "sendMessage called"); - BufferData messageData = BufferData.growing(BUFFER_SIZE_BYTES); - messageData.readFrom(requestMarshaller().stream(message)); + + // serialize message using a marshaller + ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES); + try (InputStream is = requestMarshaller().stream(message)) { + is.transferTo(baos); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + byte[] serialized = baos.toByteArray(); + + // queue data message and start writer + BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length); BufferData headerData = BufferData.create(5); headerData.writeInt8(0); // no compression headerData.writeUnsignedInt32(messageData.available()); // length prefixed diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java index 0c6e8017326..e2c9a87fd81 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java @@ -85,11 +85,14 @@ public void sendMessage(ReqT message) { clientStream().writeData(BufferData.create(headerData, messageData), true); requestSent = true; + // read response headers + clientStream().readHeaders(); + while (isRemoteOpen()) { - // trailers received? - if (clientStream().trailers().isDone()) { - socket().log(LOGGER, DEBUG, "trailers received"); - return; + // trailers or eos received? + if (clientStream().trailers().isDone() || !clientStream().hasEntity()) { + socket().log(LOGGER, DEBUG, "[Reading thread] trailers or eos received"); + break; } // attempt to read and queue diff --git a/webclient/tests/grpc/src/main/proto/strings.proto b/webclient/tests/grpc/src/main/proto/strings.proto index 2da17d2f5de..f04386897b4 100644 --- a/webclient/tests/grpc/src/main/proto/strings.proto +++ b/webclient/tests/grpc/src/main/proto/strings.proto @@ -23,6 +23,8 @@ service StringService { rpc Split (StringMessage) returns (stream StringMessage) {} rpc Join (stream StringMessage) returns (StringMessage) {} rpc Echo (stream StringMessage) returns (stream StringMessage) {} + rpc BadMethod (StringMessage) returns (StringMessage) {} + rpc NotImplementedMethod (StringMessage) returns (StringMessage) {} } message StringMessage { diff --git a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcBaseTest.java b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcBaseTest.java index 5f31f57e05f..82897279b5a 100644 --- a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcBaseTest.java +++ b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcBaseTest.java @@ -27,6 +27,7 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.MethodDescriptor; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.helidon.common.Weight; import io.helidon.common.configurable.Resource; @@ -74,7 +75,11 @@ static void setUpRoute(GrpcRouting.Builder routing) { .bidi(Strings.getDescriptor(), "StringService", "Echo", - GrpcStubTest::echo); + GrpcStubTest::echo) + .unary(Strings.getDescriptor(), + "StringService", + "BadMethod", + GrpcStubTest::badMethod); } @BeforeEach @@ -154,6 +159,10 @@ public void onCompleted() { }; } + static void badMethod(Strings.StringMessage req, StreamObserver streamObserver) { + streamObserver.onError(Status.INTERNAL.asException()); + } + Strings.StringMessage newStringMessage(String data) { return Strings.StringMessage.newBuilder().setText(data).build(); } diff --git a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java index ca0dc399ffc..935bb22aca7 100644 --- a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java +++ b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java @@ -16,11 +16,14 @@ package io.helidon.webclient.grpc.tests; +import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; import io.helidon.common.configurable.Resource; import io.helidon.common.tls.Tls; @@ -28,6 +31,7 @@ import io.helidon.webclient.grpc.GrpcClient; import io.helidon.webserver.WebServer; import io.helidon.webserver.testing.junit5.ServerTest; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import io.grpc.stub.StreamObserver; @@ -144,4 +148,34 @@ void testBidirectionalEchoAsyncEmpty() throws ExecutionException, InterruptedExc Iterator res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(res.hasNext(), is(false)); } + + @Test + void testBidirectionalEchoAsyncWithLargePayload() throws ExecutionException, InterruptedException, TimeoutException { + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel()); + CompletableFuture> future = new CompletableFuture<>(); + StreamObserver req = service.echo(multiStreamObserver(future)); + byte[] array = new byte[2000]; + new Random().nextBytes(array); + String largeString = new String(array, StandardCharsets.UTF_8); + req.onNext(newStringMessage(largeString)); + req.onCompleted(); + Iterator res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(res.next().getText(), is(largeString)); + assertThat(res.hasNext(), is(false)); + } + + @Test + void testReceiveServerException() { + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel()); + Assertions.assertThrows(Throwable.class, () -> service.badMethod(newStringMessage("hello"))); + } + + @Test + void testCallingNotImplementMethodThrowsException() { + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel()); + Assertions.assertThrows(Throwable.class, () -> service.notImplementedMethod(newStringMessage("hello"))); + } } diff --git a/webclient/tests/grpc/src/test/resources/logging.properties b/webclient/tests/grpc/src/test/resources/logging.properties index 344ba7085fd..09e74ff2922 100644 --- a/webclient/tests/grpc/src/test/resources/logging.properties +++ b/webclient/tests/grpc/src/test/resources/logging.properties @@ -21,4 +21,4 @@ handlers=io.helidon.logging.jul.HelidonConsoleHandler java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n .level=INFO -#io.helidon.webclient.grpc.level=FINEST +io.helidon.webclient.grpc.level=FINEST