diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcHeaderNames.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcHeaderNames.java index 7a5304f18e..962ab721b8 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcHeaderNames.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcHeaderNames.java @@ -15,6 +15,7 @@ */ package io.servicetalk.grpc.api; +import io.servicetalk.grpc.internal.GrpcStatusUtils; import io.servicetalk.http.api.HttpHeaderNames; import static io.servicetalk.buffer.api.CharSequences.newAsciiString; @@ -59,7 +60,7 @@ public final class GrpcHeaderNames { * * @see gRPC over HTTP2 */ - public static final CharSequence GRPC_STATUS_MESSAGE = newAsciiString("grpc-message"); + public static final CharSequence GRPC_STATUS_MESSAGE = GrpcStatusUtils.GRPC_STATUS_MESSAGE; /** * Message-Type → {@code grpc-message-type} diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java index aac865bf92..329a571643 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java @@ -29,6 +29,7 @@ import io.servicetalk.encoding.api.internal.ContentCodecToBufferEncoder; import io.servicetalk.encoding.api.internal.HeaderUtils; import io.servicetalk.grpc.api.DefaultGrpcMetadata.LazyContextMapSupplier; +import io.servicetalk.grpc.internal.GrpcStatusUtils; import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.HttpDeserializer; import io.servicetalk.http.api.HttpHeaders; @@ -70,7 +71,6 @@ import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_MESSAGE_ENCODING; import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_STATUS; import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_STATUS_DETAILS_BIN; -import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_STATUS_MESSAGE; import static io.servicetalk.grpc.api.GrpcHeaderValues.APPLICATION_GRPC; import static io.servicetalk.grpc.api.GrpcHeaderValues.APPLICATION_GRPC_PROTO; import static io.servicetalk.grpc.api.GrpcHeaderValues.GRPC_CONTENT_TYPE_PREFIX; @@ -257,7 +257,7 @@ static void setStatus(final HttpHeaders trailers, final GrpcStatus status, @Null @Nullable final BufferAllocator allocator) { trailers.set(GRPC_STATUS, valueOf(status.code().value())); if (status.description() != null) { - trailers.set(GRPC_STATUS_MESSAGE, status.description()); + GrpcStatusUtils.setStatusMessage(trailers, status.description()); } if (details != null) { assert allocator != null; @@ -470,7 +470,7 @@ private static GrpcStatusException convertToGrpcStatusException(final GrpcStatus if (grpcStatusCode.value() == GrpcStatusCode.OK.value()) { return null; } - final CharSequence statusMsg = headers.get(GRPC_STATUS_MESSAGE); + final CharSequence statusMsg = GrpcStatusUtils.getStatusMessage(headers); final GrpcStatus grpcStatus = new GrpcStatus(grpcStatusCode, statusMsg == null ? null : statusMsg.toString()); return new GrpcStatusException(grpcStatus, new StatusSupplier(headers, grpcStatus)); } diff --git a/servicetalk-grpc-internal/gradle/checkstyle/suppressions.xml b/servicetalk-grpc-internal/gradle/checkstyle/suppressions.xml index 892f88b4a7..c1029512c2 100644 --- a/servicetalk-grpc-internal/gradle/checkstyle/suppressions.xml +++ b/servicetalk-grpc-internal/gradle/checkstyle/suppressions.xml @@ -19,4 +19,6 @@ "https://checkstyle.org/dtds/suppressions_1_2.dtd"> + diff --git a/servicetalk-grpc-internal/license/LICENSE.grpc.txt b/servicetalk-grpc-internal/license/LICENSE.grpc.txt new file mode 100644 index 0000000000..7a4a3ea242 --- /dev/null +++ b/servicetalk-grpc-internal/license/LICENSE.grpc.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcStatusUtils.java b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcStatusUtils.java new file mode 100644 index 0000000000..54bb0b755f --- /dev/null +++ b/servicetalk-grpc-internal/src/main/java/io/servicetalk/grpc/internal/GrpcStatusUtils.java @@ -0,0 +1,176 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.internal; + +import io.servicetalk.http.api.HttpHeaders; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import javax.annotation.Nullable; + +import static io.servicetalk.buffer.api.CharSequences.newAsciiString; + +/** + * Provides utilities around percent-encoding and decoding the GRPC status message. + *

+ * Note that much of the actual encoding and decoding logic is borrowed from the {@code io.grpc.Status} class, + * specifically the {@code io.grpc.Status#StatusMessageMarshaller}. + */ +public final class GrpcStatusUtils { + + public static final CharSequence GRPC_STATUS_MESSAGE = newAsciiString("grpc-message"); + + private static final byte[] HEX = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + + private GrpcStatusUtils() { + // singleton + } + + /** + * Sets and potentially encodes the given status message. + * + * @param headers the headers on where to set the message. + * @param message the message to set. + */ + public static void setStatusMessage(final HttpHeaders headers, final CharSequence message) { + for (int i = 0; i < message.length(); i++) { + char c = message.charAt(i); + // If there are only ASCII non-escaping characters we can skip the slower paths. + if (c > 127 || isEscapingChar((byte) c)) { + // We have non ASCII compatible characters. We must encode the string as UTF-8 and then + // possibly percent encode. + final byte[] messageBytes = message.toString().getBytes(StandardCharsets.UTF_8); + + // Next we need to see if we have to percent encode the bytes form. + for (int j = i; j < messageBytes.length; j++) { + if (isEscapingChar(messageBytes[j])) { + // Character that needs escaping found, continue on slow path with encoding. + headers.set(GRPC_STATUS_MESSAGE, encodeMessage(messageBytes, j)); + return; + } + } + + // The UTF-8 encoded form doesn't require percent encoding. Use the ISO-8859-1 charset to preserve + // the byte representation of the string. + headers.set(GRPC_STATUS_MESSAGE, new String(messageBytes, StandardCharsets.ISO_8859_1)); + return; + } + } + headers.set(GRPC_STATUS_MESSAGE, message); + } + + /** + * Tries to read the status message from the {@link HttpHeaders} and percent-decode if necessary. + * + * @param headers the headers to load and decode the status message from. + * @return the decoded status message, or null of no message found in the header provided. + */ + @Nullable + public static CharSequence getStatusMessage(final HttpHeaders headers) { + final CharSequence message = headers.get(GRPC_STATUS_MESSAGE); + if (message == null) { + return null; + } + + final byte[] messageBytes = message.toString().getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < messageBytes.length; i++) { + byte b = messageBytes[i]; + if (b < ' ' || b >= '~' || (b == '%' && i + 2 < messageBytes.length)) { + return decodeMessage(messageBytes); + } + } + return message; + } + + /** + * Decodes the {@link CharSequence} removing the percent-encoding where needed. + * + * @param messageBytes the message to decode. + * @return the deocded message. + */ + private static CharSequence decodeMessage(final byte[] messageBytes) { + ByteBuffer buf = ByteBuffer.allocate(messageBytes.length); + for (int i = 0; i < messageBytes.length;) { + if (messageBytes[i] == '%' && i + 2 < messageBytes.length) { + try { + buf.put((byte) Integer.parseInt(new String(messageBytes, i + 1, 2, StandardCharsets.US_ASCII), 16)); + i += 3; + continue; + } catch (NumberFormatException e) { + // ignore, fall through, just push the bytes. + } + } + buf.put(messageBytes[i]); + i += 1; + } + return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8); + } + + /** + * Describes the character ranges which need escaping (essentially non-printable characters). + * + * @param b the character to check. + * @return true if it needs escaping, false otherwise. + */ + private static boolean isEscapingChar(byte b) { + return b < ' ' || b >= '~' || b == '%'; + } + + /** + * Performs encoding of the message by using a percent-encoding scheme. + * + * @param msgBytes the encoded message. + * @param ri the reader index previously iterated to. + * @return the encoded message. + */ + private static CharSequence encodeMessage(byte[] msgBytes, int ri) { + byte[] escapedBytes = new byte[ri + (msgBytes.length - ri) * 3]; + // copy over the good bytes + if (ri != 0) { + System.arraycopy(msgBytes, 0, escapedBytes, 0, ri); + } + int wi = ri; + for (; ri < msgBytes.length; ri++) { + byte b = msgBytes[ri]; + // Manually implement URL encoding, per the gRPC spec. + if (isEscapingChar(b)) { + escapedBytes[wi] = '%'; + escapedBytes[wi + 1] = HEX[(b >> 4) & 0xF]; + escapedBytes[wi + 2] = HEX[b & 0xF]; + wi += 3; + continue; + } + escapedBytes[wi++] = b; + } + return new String(escapedBytes, 0, wi, StandardCharsets.ISO_8859_1); + } +} diff --git a/servicetalk-grpc-internal/src/test/java/io/servicetalk/grpc/internal/GrpcStatusUtilsTest.java b/servicetalk-grpc-internal/src/test/java/io/servicetalk/grpc/internal/GrpcStatusUtilsTest.java new file mode 100644 index 0000000000..31f078d1ae --- /dev/null +++ b/servicetalk-grpc-internal/src/test/java/io/servicetalk/grpc/internal/GrpcStatusUtilsTest.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.internal; + +import io.servicetalk.http.api.DefaultHttpHeadersFactory; +import io.servicetalk.http.api.HttpHeaders; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +final class GrpcStatusUtilsTest { + + private final HttpHeaders headers = DefaultHttpHeadersFactory.INSTANCE.newHeaders(); + + @ParameterizedTest(name = "{displayName} [{index}]: decoded={0} encoded={1}") + @MethodSource("messageSamples") + void testMessageEncoding(String decoded, String encoded) { + GrpcStatusUtils.setStatusMessage(headers, decoded); + assertEquals(encoded, headers.get(GrpcStatusUtils.GRPC_STATUS_MESSAGE)); + } + + @ParameterizedTest(name = "{displayName} [{index}]: decoded={0} encoded={1}") + @MethodSource("messageSamples") + void testMessageDecoding(String decoded, String encoded) { + headers.set(GrpcStatusUtils.GRPC_STATUS_MESSAGE, encoded); + assertEquals(decoded, GrpcStatusUtils.getStatusMessage(headers)); + } + + @Test + void testNullMessageDecoding() { + assertNull(GrpcStatusUtils.getStatusMessage(headers)); + } + + /** + * With a proper encoder this should not happen, but similar to io.grpc if there is a % at the end + * it is not considered encoded and returned as-is (also see {@link #testInvalidNumberDecoding()} for + * similar behavior). + */ + @Test + void testPercentAtEndMessageDecoding() { + headers.set(GrpcStatusUtils.GRPC_STATUS_MESSAGE, "aa%"); + assertEquals("aa%", GrpcStatusUtils.getStatusMessage(headers)); + + headers.set(GrpcStatusUtils.GRPC_STATUS_MESSAGE, "aa% "); + assertEquals("aa% ", GrpcStatusUtils.getStatusMessage(headers)); + } + + /** + * According to the spec, invalid values are not discarded: + *

+ * "When decoding invalid values, implementations MUST NOT error or throw away the message. At worst, the + * implementation can abort decoding the status message altogether such that the user would received the + * raw percent-encoded form." + */ + @Test + void testInvalidNumberDecoding() { + headers.set(GrpcStatusUtils.GRPC_STATUS_MESSAGE, "%z0"); + assertEquals("%z0", GrpcStatusUtils.getStatusMessage(headers)); + + headers.set(GrpcStatusUtils.GRPC_STATUS_MESSAGE, "%7E%z0%7e"); + assertEquals("~%z0~", GrpcStatusUtils.getStatusMessage(headers)); + } + + static Stream messageSamples() { + return Stream.of( + Arguments.of("abc", "abc"), + Arguments.of("Hello, World!", "Hello, World!"), + Arguments.of("a\r\nbc", "a%0D%0Abc"), + Arguments.of("a%bc", "a%25bc"), + Arguments.of("~ what? ~", "%7E what? %7E"), + Arguments.of("фяї", "%D1%84%D1%8F%D1%97"), + Arguments.of("üñö", "%C3%BC%C3%B1%C3%B6"), + Arguments.of("非常感謝", "%E9%9D%9E%E5%B8%B8%E6%84%9F%E8%AC%9D") + ); + } +} diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java index 23abb4473b..3fdc365ccc 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/GrpcToHttpLifecycleObserverBridge.java @@ -21,6 +21,7 @@ import io.servicetalk.grpc.api.GrpcLifecycleObserver.GrpcRequestObserver; import io.servicetalk.grpc.api.GrpcLifecycleObserver.GrpcResponseObserver; import io.servicetalk.grpc.api.GrpcStatus; +import io.servicetalk.grpc.internal.GrpcStatusUtils; import io.servicetalk.http.api.HttpHeaders; import io.servicetalk.http.api.HttpLifecycleObserver; import io.servicetalk.http.api.HttpRequestMetaData; @@ -30,7 +31,6 @@ import javax.annotation.Nullable; import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_STATUS; -import static io.servicetalk.grpc.api.GrpcHeaderNames.GRPC_STATUS_MESSAGE; import static io.servicetalk.grpc.api.GrpcStatusCode.fromCodeValue; import static java.util.Objects.requireNonNull; @@ -180,7 +180,7 @@ static GrpcStatus status(@Nullable final HttpHeaders headers) { if (statusStr == null) { return null; } - final CharSequence statusMsg = headers.get(GRPC_STATUS_MESSAGE); + final CharSequence statusMsg = GrpcStatusUtils.getStatusMessage(headers); return new GrpcStatus(fromCodeValue(statusStr), statusMsg == null ? null : statusMsg.toString()); } } diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java index 3425ced858..7d5601d2c2 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java @@ -247,12 +247,26 @@ private static Collection sslAndCompressionParams() { return args; } + private static Collection statusMessageParams() { + final String[] messages = { + "abc", "Hello, World!", "a\r\nbc", "a%bc", "~ what? ~", "фяї", "üñö", "非常感謝", + }; + + List args = new ArrayList<>(); + for (boolean streaming : STREAMING) { + for (String message : messages) { + args.add(Arguments.of(streaming, message)); + } + } + return args; + } + @ParameterizedTest @MethodSource("sslStreamingAndCompressionParams") void grpcJavaToGrpcJava(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testRequestResponse(client, server, streaming, compression); } @@ -262,7 +276,7 @@ void grpcJavaToGrpcJava(final boolean ssl, void serviceTalkToGrpcJava(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression, null); final CompatClient client = serviceTalkClient(server.listenAddress(), ssl, compression, null); testRequestResponse(client, server, streaming, compression); } @@ -293,19 +307,53 @@ void serviceTalkBlockingToServiceTalkBlocking(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = serviceTalkServerBlocking(ErrorMode.NONE, ssl, compression); + final TestServerContext server = serviceTalkServerBlocking(ErrorMode.NONE, ssl, compression, null); final BlockingCompatClient client = serviceTalkClient(server.listenAddress(), ssl, compression, null) .asBlockingClient(); testBlockingRequestResponse(client, server, streaming, compression); } + @ParameterizedTest(name = "{displayName} [{index}]: streaming={0} message={1}") + @MethodSource("statusMessageParams") + void serviceTalkToServiceTalkStatusMessage(final boolean streaming, final String message) throws Exception { + final TestServerContext server = serviceTalkServer(ErrorMode.STATUS, false, defaultStrategy(), null, null, + new ArrayDeque<>(), message); + final CompatClient client = serviceTalkClient(server.listenAddress(), false, null, null); + testGrpcError(client, server, true, streaming, "identity", INVALID_ARGUMENT, message); + } + + @ParameterizedTest(name = "{displayName} [{index}]: streaming={0} message={1}") + @MethodSource("statusMessageParams") + void grpcJavaToGrpcJavaStatusMessage(final boolean streaming, final String message) throws Exception { + final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, false, null, message); + final CompatClient client = grpcJavaClient(server.listenAddress(), null, false, null); + testGrpcError(client, server, true, streaming, "identity", INVALID_ARGUMENT, message); + } + + @ParameterizedTest(name = "{displayName} [{index}]: streaming={0} message={1}") + @MethodSource("statusMessageParams") + void grpcJavaToServiceTalkStatusMessage(final boolean streaming, final String message) throws Exception { + final TestServerContext server = serviceTalkServer(ErrorMode.STATUS, false, defaultStrategy(), null, null, + new ArrayDeque<>(), message); + final CompatClient client = grpcJavaClient(server.listenAddress(), null, false, null); + testGrpcError(client, server, true, streaming, "identity", INVALID_ARGUMENT, message); + } + + @ParameterizedTest(name = "{displayName} [{index}]: streaming={0} message={1}") + @MethodSource("statusMessageParams") + void serviceTalkToGrpcJavaStatusMessage(final boolean streaming, final String message) throws Exception { + final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, false, null, message); + final CompatClient client = serviceTalkClient(server.listenAddress(), false, null, null); + testGrpcError(client, server, true, streaming, "identity", INVALID_ARGUMENT, message); + } + @ParameterizedTest @MethodSource("sslAndStreamingParams") void grpcJavaToGrpcJavaCompressionError(final boolean ssl, final boolean streaming) throws Exception { final String clientCompression = "gzip"; - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, null); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, null, null); final CompatClient client = grpcJavaClient(server.listenAddress(), clientCompression, ssl, null); testGrpcError(client, server, false, streaming, clientCompression, GrpcStatusCode.UNIMPLEMENTED, null); } @@ -327,7 +375,7 @@ void serviceTalkToGrpcJavaCompressionError(final boolean ssl, final boolean streaming) throws Exception { final String clientCompression = "gzip"; - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, null); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, null, null); final CompatClient client = serviceTalkClient(server.listenAddress(), ssl, clientCompression, null); testGrpcError(client, server, false, streaming, clientCompression, GrpcStatusCode.UNIMPLEMENTED, null); } @@ -348,7 +396,7 @@ void serviceTalkToServiceTalkCompressionError(final boolean ssl, void grpcJavaToGrpcJavaError(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.SIMPLE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.SIMPLE, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testGrpcError(client, server, false, streaming, compression); } @@ -359,7 +407,7 @@ void grpcJavaToGrpcJavaErrorWithStatus(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testGrpcError(client, server, true, streaming, compression); } @@ -369,7 +417,7 @@ void grpcJavaToGrpcJavaErrorWithStatus(final boolean ssl, void serviceTalkToGrpcJavaError(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.SIMPLE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.SIMPLE, ssl, compression, null); final CompatClient client = serviceTalkClient(server.listenAddress(), ssl, compression, null); testGrpcError(client, server, false, streaming, compression); } @@ -380,7 +428,7 @@ void serviceTalkToGrpcJavaErrorWithStatus(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.STATUS, ssl, compression, null); final CompatClient client = serviceTalkClient(server.listenAddress(), ssl, compression, null); testGrpcError(client, server, true, streaming, compression); } @@ -498,7 +546,7 @@ void grpcJavaToServiceTalkBlocking( final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = serviceTalkServerBlocking(ErrorMode.NONE, ssl, compression); + final TestServerContext server = serviceTalkServerBlocking(ErrorMode.NONE, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testRequestResponse(client, server, streaming, compression); } @@ -568,7 +616,7 @@ void grpcJavaToServiceTalkBlockingError(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = serviceTalkServerBlocking(ErrorMode.SIMPLE, ssl, compression); + final TestServerContext server = serviceTalkServerBlocking(ErrorMode.SIMPLE, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testGrpcError(client, server, false, streaming, compression); } @@ -579,7 +627,7 @@ void grpcJavaToServiceTalkBlockingErrorWithStatus(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = serviceTalkServerBlocking(ErrorMode.STATUS, ssl, compression); + final TestServerContext server = serviceTalkServerBlocking(ErrorMode.STATUS, ssl, compression, null); final CompatClient client = grpcJavaClient(server.listenAddress(), compression, ssl, null); testGrpcError(client, server, true, streaming, compression); } @@ -632,7 +680,7 @@ void serviceTalkToServiceTalkErrorWithStatusViaServerFilter( @MethodSource("sslStreamingAndCompressionParams") void grpcJavaToGrpcJavaClientTimeout(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression, null); try (ServerContext proxyCtx = buildTimeoutProxy(server.listenAddress(), null, ssl)) { final CompatClient client = grpcJavaClient(proxyCtx.listenAddress(), compression, ssl, DEFAULT_DEADLINE); testGrpcError(client, server, false, streaming, compression, DEADLINE_EXCEEDED, null); @@ -643,7 +691,7 @@ void grpcJavaToGrpcJavaClientTimeout(final boolean ssl, final boolean streaming, @MethodSource("sslStreamingAndCompressionParams") void serviceTalkToGrpcJavaClientTimeout(final boolean ssl, final boolean streaming, final String compression) throws Exception { - final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression); + final TestServerContext server = grpcJavaServer(ErrorMode.NONE, ssl, compression, null); try (ServerContext proxyCtx = buildTimeoutProxy(server.listenAddress(), null, ssl)) { final CompatClient client = serviceTalkClient(proxyCtx.listenAddress(), ssl, compression, DEFAULT_DEADLINE); testGrpcError(client, server, false, streaming, compression, DEADLINE_EXCEEDED, null); @@ -680,8 +728,8 @@ void timeoutMidRequest(boolean stClient, boolean stServer, boolean clientInitiat Duration serverTimeout = clientInitiatedTimeout ? null : DEFAULT_DEADLINE; BlockingQueue serverErrorQueue = new ArrayBlockingQueue<>(16); final TestServerContext server = stServer ? - serviceTalkServer(ErrorMode.NONE, false, from(offloadNone()), null, null, serverErrorQueue) : - grpcJavaServer(ErrorMode.NONE, false, null); + serviceTalkServer(ErrorMode.NONE, false, from(offloadNone()), null, null, serverErrorQueue, null) : + grpcJavaServer(ErrorMode.NONE, false, null, null); try (ServerContext proxyCtx = buildTimeoutProxy(server.listenAddress(), serverTimeout, false)) { final CompatClient client = stClient ? serviceTalkClient(proxyCtx.listenAddress(), false, null, clientTimeout) : @@ -1037,11 +1085,11 @@ private static void assertFallbackStatus(final com.google.rpc.Status status, fin assertEquals(0, anyList.size()); } - private static com.google.rpc.Status newStatus() { + private static com.google.rpc.Status newStatus(final String message) { // We just use CompatResponse as part of the status to keep it simple. return com.google.rpc.Status.newBuilder() .setCode(GrpcStatusCode.INVALID_ARGUMENT.value()) - .setMessage(CUSTOM_ERROR_MESSAGE) + .setMessage(message) .addDetails(pack(computeResponse(999))) .build(); } @@ -1110,9 +1158,9 @@ public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest req, final StreamingHttpResponseFactory resFactory) { if (errorMode == ErrorMode.SIMPLE_IN_SERVER_FILTER) { - throwGrpcStatusException(); + throwGrpcStatusException(CUSTOM_ERROR_MESSAGE); } else if (errorMode == ErrorMode.STATUS_IN_SERVER_FILTER) { - throwGrpcStatusExceptionWithStatus(); + throwGrpcStatusExceptionWithStatus(CUSTOM_ERROR_MESSAGE); } return delegate().handle(ctx, req, resFactory); } @@ -1129,7 +1177,8 @@ public Single handle(final HttpServiceContext ctx, } private static TestServerContext serviceTalkServerBlocking(final ErrorMode errorMode, final boolean ssl, - @Nullable final String compression) throws Exception { + @Nullable final String compression, + @Nullable final String statusMessage) throws Exception { final ServerContext serverContext = serviceTalkServerBuilder(ErrorMode.NONE, ssl, null) .listenAndAwait(new ServiceFactory.Builder() .bufferDecoderGroup(serviceTalkDecompression(compression)) @@ -1139,7 +1188,7 @@ private static TestServerContext serviceTalkServerBlocking(final ErrorMode error public void bidirectionalStreamingCall( final GrpcServiceContext ctx, final BlockingIterable request, final GrpcPayloadWriter responseWriter) throws Exception { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); for (CompatRequest requestItem : request) { responseWriter.write(computeResponse(requestItem.getId())); } @@ -1149,7 +1198,7 @@ public void bidirectionalStreamingCall( @Override public CompatResponse clientStreamingCall(final GrpcServiceContext ctx, final BlockingIterable request) { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); int sum = 0; for (CompatRequest requestItem : request) { sum += requestItem.getId(); @@ -1160,7 +1209,7 @@ public CompatResponse clientStreamingCall(final GrpcServiceContext ctx, @Override public CompatResponse scalarCall(final GrpcServiceContext ctx, final CompatRequest request) { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); return computeResponse(request.getId()); } @@ -1168,7 +1217,7 @@ public CompatResponse scalarCall(final GrpcServiceContext ctx, public void serverStreamingCall(final GrpcServiceContext ctx, final CompatRequest request, final GrpcPayloadWriter responseWriter) throws Exception { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); for (int i = 0; i < request.getId(); i++) { responseWriter.write(computeResponse(i)); } @@ -1218,21 +1267,22 @@ private static List serviceTalkCompressions(@Nullable final Strin return encoders; } - private static void maybeThrowFromRpc(final ErrorMode errorMode) { + private static void maybeThrowFromRpc(final ErrorMode errorMode, @Nullable final String statusMessage) { + final String message = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; if (errorMode == ErrorMode.SIMPLE) { - throwGrpcStatusException(); + throwGrpcStatusException(message); } else if (errorMode == ErrorMode.STATUS) { - throwGrpcStatusExceptionWithStatus(); + throwGrpcStatusExceptionWithStatus(message); } } - private static void throwGrpcStatusException() { + private static void throwGrpcStatusException(final String message) { // INVALID_ARGUMENT is used because it can only be generated by application. ie. not generated by gRPC library - throw new GrpcStatusException(new GrpcStatus(GrpcStatusCode.INVALID_ARGUMENT, CUSTOM_ERROR_MESSAGE)); + throw new GrpcStatusException(new GrpcStatus(GrpcStatusCode.INVALID_ARGUMENT, message)); } - private static void throwGrpcStatusExceptionWithStatus() { - throw GrpcStatusException.of(newStatus()); + private static void throwGrpcStatusExceptionWithStatus(final String message) { + throw GrpcStatusException.of(newStatus(message)); } private static TestServerContext serviceTalkServer(final ErrorMode errorMode, final boolean ssl, @@ -1245,19 +1295,19 @@ private static TestServerContext serviceTalkServer(final ErrorMode errorMode, fi final GrpcExecutionStrategy strategy, @Nullable final String compression, @Nullable final Duration timeout) throws Exception { - return serviceTalkServer(errorMode, ssl, strategy, compression, timeout, new ArrayDeque<>()); + return serviceTalkServer(errorMode, ssl, strategy, compression, timeout, new ArrayDeque<>(), null); } private static TestServerContext serviceTalkServer( final ErrorMode errorMode, final boolean ssl, final GrpcExecutionStrategy strategy, @Nullable final String compression, @Nullable final Duration timeout, - Queue reqStreamError) throws Exception { + Queue reqStreamError, @Nullable final String statusMessage) throws Exception { final Compat.CompatService compatService = new Compat.CompatService() { @Override public Publisher bidirectionalStreamingCall(final GrpcServiceContext ctx, final Publisher pub) { reqStreamError.add(SERVER_PROCESSED_TOKEN); - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); return pub.map(req -> response(req.getId())).beforeFinally(errorConsumer()); } @@ -1265,29 +1315,30 @@ public Publisher bidirectionalStreamingCall(final GrpcServiceCon public Single clientStreamingCall(final GrpcServiceContext ctx, final Publisher pub) { reqStreamError.add(SERVER_PROCESSED_TOKEN); - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); return pub.collect(() -> 0, (sum, req) -> sum + req.getId()).map(this::response) .beforeFinally(errorConsumer()); } @Override public Single scalarCall(final GrpcServiceContext ctx, final CompatRequest req) { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); return succeeded(response(req.getId())); } @Override public Publisher serverStreamingCall(final GrpcServiceContext ctx, final CompatRequest req) { - maybeThrowFromRpc(errorMode); + maybeThrowFromRpc(errorMode, statusMessage); return Publisher.fromIterable(() -> IntStream.range(0, req.getId()).iterator()).map(this::response); } private CompatResponse response(final int value) { + final String message = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; if (errorMode == ErrorMode.SIMPLE_IN_RESPONSE) { - throwGrpcStatusException(); + throwGrpcStatusException(message); } else if (errorMode == ErrorMode.STATUS_IN_RESPONSE) { - throwGrpcStatusExceptionWithStatus(); + throwGrpcStatusExceptionWithStatus(message); } return computeResponse(value); } @@ -1515,7 +1566,8 @@ public void onCompleted() { } private static TestServerContext grpcJavaServer(final ErrorMode errorMode, final boolean ssl, - @Nullable final String compression) throws Exception { + @Nullable final String compression, + @Nullable final String statusMessage) throws Exception { final NettyServerBuilder builder = NettyServerBuilder.forAddress(localAddress(0)); if (ssl) { builder.useTransportSecurity(loadServerPem(), loadServerKey()); @@ -1632,11 +1684,12 @@ public void onCompleted() { } private CompatResponse response(final int value) throws Exception { + final String description = statusMessage == null ? CUSTOM_ERROR_MESSAGE : statusMessage; if (errorMode == ErrorMode.SIMPLE) { - throw Status.INVALID_ARGUMENT.augmentDescription(CUSTOM_ERROR_MESSAGE).asException(); + throw Status.INVALID_ARGUMENT.augmentDescription(description).asException(); } if (errorMode == ErrorMode.STATUS) { - throw StatusProto.toStatusException(newStatus()); + throw StatusProto.toStatusException(newStatus(description)); } return computeResponse(value); }