diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcBaseClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcBaseClientCall.java index 06aa4d188e2..37b79065523 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcBaseClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcBaseClientCall.java @@ -16,7 +16,10 @@ package io.helidon.webclient.grpc; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.Collections; import java.util.concurrent.Executor; @@ -214,4 +217,14 @@ public int read() { } }); } + + protected byte[] serializeMessage(ReqT message) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES); + try (InputStream is = requestMarshaller().stream(message)) { + is.transferTo(baos); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return baos.toByteArray(); + } } diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java index 13549c464cd..56a836e22b9 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java @@ -16,10 +16,6 @@ package io.helidon.webclient.grpc; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -93,16 +89,8 @@ public void halfClose() { public void sendMessage(ReqT message) { socket().log(LOGGER, DEBUG, "sendMessage called"); - // serialize message using a marshaller - ByteArrayOutputStream baos = new ByteArrayOutputStream(BUFFER_SIZE_BYTES); - try (InputStream is = requestMarshaller().stream(message)) { - is.transferTo(baos); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - byte[] serialized = baos.toByteArray(); - - // queue data message and start writer + // serialize and queue message for writing + byte[] serialized = serializeMessage(message); BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length); BufferData headerData = BufferData.create(5); headerData.writeInt8(0); // no compression diff --git a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java index e2c9a87fd81..ac3291bd248 100644 --- a/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java +++ b/webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java @@ -77,8 +77,9 @@ public void sendMessage(ReqT message) { return; } - BufferData messageData = BufferData.growing(BUFFER_SIZE_BYTES); - messageData.readFrom(requestMarshaller().stream(message)); + // serialize and write message + byte[] serialized = serializeMessage(message); + BufferData messageData = BufferData.createReadOnly(serialized, 0, serialized.length); BufferData headerData = BufferData.create(5); headerData.writeInt8(0); // no compression headerData.writeUnsignedInt32(messageData.available()); // length prefixed diff --git a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java index 935bb22aca7..e1701dca81e 100644 --- a/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java +++ b/webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java @@ -16,6 +16,7 @@ package io.helidon.webclient.grpc.tests; +import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Random; @@ -23,7 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.IntStream; import io.helidon.common.configurable.Resource; import io.helidon.common.tls.Tls; @@ -70,6 +70,15 @@ void testUnaryUpper() { assertThat(res.getText(), is("HELLO")); } + @Test + void tesUnaryUpperLongString() { + GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL); + StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel()); + String s = CharBuffer.allocate(2000).toString().replace('\0', 'a'); + Strings.StringMessage res = service.upper(newStringMessage(s)); + assertThat(res.getText(), is(s.toUpperCase())); + } + @Test void testUnaryUpperAsync() throws ExecutionException, InterruptedException, TimeoutException { GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);