diff --git a/grpc-java/LICENSE b/grpc-java/LICENSE new file mode 100644 index 00000000000..d6456956733 --- /dev/null +++ b/grpc-java/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/grpc-java/NOTICE b/grpc-java/NOTICE new file mode 100644 index 00000000000..b36b8f6eaf9 --- /dev/null +++ b/grpc-java/NOTICE @@ -0,0 +1,23 @@ +Copyright 2014 The gRPC Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +----------------------------------------------------------------------- + +This product contains a modified portion of 'grpc-java', an open source +RPC library and framework, which can be obtained at + + * LICENSE: + * grpc-java/LICENSE + * HOMEPAGE: + * https://github.com/grpc/grpc-java diff --git a/grpc-java/README.md b/grpc-java/README.md new file mode 100644 index 00000000000..82012b93f82 --- /dev/null +++ b/grpc-java/README.md @@ -0,0 +1,25 @@ +# grpc-java + +This directory contains code licensed under the Apache License, Version 2.0. + +## grpc-servlet-jakarta +The grpc-servlet-jakarta directory is from https://github.com/grpc/grpc-java/pull/8596, +a pull request to add a Java Servlet based transport. This version only supports +Jakarta Servlets, and is copied from the generated sources created by that pull +request, to best support the newest versions of Jetty. There is also one small +change to allow the ServletAdapter to expose internal details to other transports, +like grpc-servlet-web or grpc-servlet-websocket, to allow avoiding using an +external proxy. + +## grpc-servlet-websocket-jakarta +The grpc-servlet-websocket-jakarta project is new, not yet submitted to grpc-java for +discussion or review. In short, it is a grpc-java transport, which uses one websocket +per stream based on the https://github.com/improbable-eng/grpc-web/ client/proxy +implementation. This enables a browser-based client to connect to a bidirectional +binary stream without SSL or an intermediate proxy. + +## grpc-servlet-web-jakarta +There will be another project here soon, for a grpc-web implementation, allowing +a browser-based client to connect to server-streaming or unary calls with text or binary +payloads, without an intermediate proxy. This will require SSL to use http2, and, until +browsers support it, will not be able to handle any form of client streaming. diff --git a/grpc-java/grpc-servlet-jakarta/build.gradle b/grpc-java/grpc-servlet-jakarta/build.gradle new file mode 100644 index 00000000000..f9aba8f3383 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/build.gradle @@ -0,0 +1,12 @@ +plugins { + id 'java-library' +} + +dependencies { + Classpaths.inheritGrpcPlatform(project) + implementation 'io.grpc:grpc-core' + compileOnly 'jakarta.servlet:jakarta.servlet-api:5.0.0', + 'org.apache.tomcat:annotations-api:6.0.53' + + implementation 'com.google.guava:guava:19.0' +} \ No newline at end of file diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java new file mode 100644 index 00000000000..82ed23794f1 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -0,0 +1,242 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.grpc.servlet; + +import static io.grpc.servlet.ServletServerStream.toHexString; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; + +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.servlet.ServletServerStream.ServletTransportState; +import java.io.IOException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; +import jakarta.servlet.AsyncContext; +import jakarta.servlet.ServletOutputStream; + +/** Handles write actions from the container thread and the application thread. */ +final class AsyncServletOutputStreamWriter { + + private static final Logger logger = + Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); + + /** + * Memory boundary for write actions. + * + *
+     * WriteState curState = writeState.get(); // mark a boundary
+     * doSomething(); // do something within the boundary
+     * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
+     * if (successful) {
+     *     // state has not changed since
+     *     return;
+     * } else {
+     *     // state is changed by another thread while doSomething(), need recompute
+     * }
+     * 
+ * + *

+ * There are two threads, the container thread (calling {@code onWritePossible()}) and the application thread + * (calling {@code runOrBufferActionItem()}) that read and update the writeState. Only onWritePossible() may turn + * readyAndEmpty from false to true, and only runOrBufferActionItem() may turn it from true to false. + */ + private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); + + private final ServletOutputStream outputStream; + private final ServletTransportState transportState; + private final InternalLogId logId; + private final ActionItem flushAction; + private final ActionItem completeAction; + + /** + * New write actions will be buffered into this queue if the servlet output stream is not ready or the queue is not + * drained. + */ + // SPSC queue would do + private final Queue writeChain = new ConcurrentLinkedQueue<>(); + // for a theoretical race condition that onWritePossible() is called immediately after isReady() + // returns false and before writeState.compareAndSet() + @Nullable + private volatile Thread parkingThread; + + AsyncServletOutputStreamWriter( + AsyncContext asyncContext, + ServletOutputStream outputStream, + ServletTransportState transportState, + InternalLogId logId) { + this.outputStream = outputStream; + this.transportState = transportState; + this.logId = logId; + this.flushAction = () -> { + logger.log(FINEST, "[{0}] flushBuffer", logId); + asyncContext.getResponse().flushBuffer(); + }; + this.completeAction = () -> { + logger.log(FINE, "[{0}] call is completing", logId); + transportState.runOnTransportThread( + () -> { + transportState.complete(); + asyncContext.complete(); + logger.log(FINE, "[{0}] call completed", logId); + }); + }; + } + + /** Called from application thread. */ + void writeBytes(byte[] bytes, int numBytes) throws IOException { + runOrBufferActionItem( + // write bytes action + () -> { + outputStream.write(bytes, 0, numBytes); + transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] outbound data: length = {1}, bytes = {2}", + new Object[] {logId, numBytes, toHexString(bytes, numBytes)}); + } + }); + } + + /** Called from application thread. */ + void flush() throws IOException { + runOrBufferActionItem(flushAction); + } + + /** Called from application thread. */ + void complete() { + try { + runOrBufferActionItem(completeAction); + } catch (IOException e) { + // actually completeAction does not throw + throw Status.fromThrowable(e).asRuntimeException(); + } + } + + /** Called from the container thread {@link jakarta.servlet.WriteListener#onWritePossible()}. */ + void onWritePossible() throws IOException { + logger.log( + FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId); + assureReadyAndEmptyFalse(); + while (outputStream.isReady()) { + WriteState curState = writeState.get(); + + ActionItem actionItem = writeChain.poll(); + if (actionItem != null) { + actionItem.run(); + continue; + } + + if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) { + // state has not changed since. + logger.log( + FINEST, + "[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output" + + " stream is still ready", + logId); + return; + } + // else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain + // again + } + logger.log( + FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId); + } + + private void runOrBufferActionItem(ActionItem actionItem) throws IOException { + WriteState curState = writeState.get(); + if (curState.readyAndEmpty) { // write to the outputStream directly + actionItem.run(); + if (!outputStream.isReady()) { + logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId); + boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false)); + assert successful; + LockSupport.unpark(parkingThread); + } + } else { // buffer to the writeChain + writeChain.offer(actionItem); + if (!writeState.compareAndSet(curState, curState.newItemBuffered())) { + // state changed by another thread (onWritePossible) + assert writeState.get().readyAndEmpty; + ActionItem lastItem = writeChain.poll(); + if (lastItem != null) { + assert lastItem == actionItem; + runOrBufferActionItem(lastItem); + } + } // state has not changed since + } + } + + private void assureReadyAndEmptyFalse() { + // readyAndEmpty should have been set to false already or right now + // It's very very unlikely readyAndEmpty is still true due to a race condition + while (writeState.get().readyAndEmpty) { + parkingThread = Thread.currentThread(); + LockSupport.parkNanos(Duration.ofSeconds(1).toNanos()); + } + parkingThread = null; + } + + /** Write actions, e.g. writeBytes, flush, complete. */ + @FunctionalInterface + private interface ActionItem { + void run() throws IOException; + } + + private static final class WriteState { + + static final WriteState DEFAULT = new WriteState(false); + + /** + * The servlet output stream is ready and the writeChain is empty. + * + *

+ * readyAndEmpty turns from false to true when: {@code onWritePossible()} exits while currently there is no more + * data to write, but the last check of {@link jakarta.servlet.ServletOutputStream#isReady()} is true. + * + *

+ * readyAndEmpty turns from false to true when: {@code runOrBufferActionItem()} exits while either the action + * item is written directly to the servlet output stream and the check of + * {@link jakarta.servlet.ServletOutputStream#isReady()} right after that returns false, or the action item is + * buffered into the writeChain. + */ + final boolean readyAndEmpty; + + WriteState(boolean readyAndEmpty) { + this.readyAndEmpty = readyAndEmpty; + } + + /** + * Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code + * runOrBufferActionItem()} can set it to false. + */ + @CheckReturnValue + WriteState withReadyAndEmpty(boolean readyAndEmpty) { + return new WriteState(readyAndEmpty); + } + + /** Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. */ + @CheckReturnValue + WriteState newItemBuffered() { + return new WriteState(false); + } + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/GrpcServlet.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/GrpcServlet.java new file mode 100644 index 00000000000..972f27f95cd --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/GrpcServlet.java @@ -0,0 +1,75 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.grpc.servlet; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.BindableService; +import io.grpc.ExperimentalApi; +import java.io.IOException; +import java.util.List; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +/** + * A simple servlet backed by a gRPC server. Must set {@code asyncSupported} to true. The {@code + * /contextRoot/urlPattern} must match the gRPC services' path, which is "/full-service-name/short-method-name". + * + *

+ * The API is experimental. The authors would like to know more about the real usecases. Users are welcome to provide + * feedback by commenting on the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +public class GrpcServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + private final ServletAdapter servletAdapter; + + @VisibleForTesting + GrpcServlet(ServletAdapter servletAdapter) { + this.servletAdapter = servletAdapter; + } + + /** + * Instantiate the servlet serving the given list of gRPC services. ServerInterceptors can be added on each gRPC + * service by {@link io.grpc.ServerInterceptors#intercept(BindableService, io.grpc.ServerInterceptor...)} + */ + public GrpcServlet(List bindableServices) { + this(loadServices(bindableServices)); + } + + private static ServletAdapter loadServices(List bindableServices) { + ServletServerBuilder serverBuilder = new ServletServerBuilder(); + bindableServices.forEach(serverBuilder::addService); + return serverBuilder.buildServletAdapter(); + } + + @Override + protected final void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException { + servletAdapter.doGet(request, response); + } + + @Override + protected final void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException { + servletAdapter.doPost(request, response); + } + + @Override + public void destroy() { + servletAdapter.destroy(); + super.destroy(); + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletAdapter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletAdapter.java new file mode 100644 index 00000000000..4a7cbed859e --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletAdapter.java @@ -0,0 +1,348 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; + +import com.google.common.io.BaseEncoding; +import io.grpc.Attributes; +import io.grpc.ExperimentalApi; +import io.grpc.Grpc; +import io.grpc.InternalLogId; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ReadableBuffers; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.StatsTraceContext; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.ReadListener; +import jakarta.servlet.ServletInputStream; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +/** + * An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server process it, and + * transforms the gRPC response into {@link HttpServletResponse}. An adapter can be instantiated by + * {@link ServletServerBuilder#buildServletAdapter()}. + * + *

+ * In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside + * {@link jakarta.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet backed by + * the gRPC server associated with the adapter. The servlet must support Asynchronous Processing and must be deployed to + * a container that supports servlet 4.0 and enables HTTP/2. + * + *

+ * The API is experimental. The authors would like to know more about the real usecases. Users are welcome to provide + * feedback by commenting on the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +public final class ServletAdapter { + + static final Logger logger = Logger.getLogger(ServletAdapter.class.getName()); + + private final ServerTransportListener transportListener; + private final List streamTracerFactories; + private final int maxInboundMessageSize; + private final Attributes attributes; + + ServletAdapter( + ServerTransportListener transportListener, + List streamTracerFactories, + int maxInboundMessageSize) { + this.transportListener = transportListener; + this.streamTracerFactories = streamTracerFactories; + this.maxInboundMessageSize = maxInboundMessageSize; + attributes = transportListener.transportReady(Attributes.EMPTY); + } + + public interface AdapterConstructor { + T newInstance(ServerTransportListener sharedListener, + List streamTracerFactories, + int maxInboundMessageSize, + Attributes attributes); + } + + @ExperimentalApi("deephaven only, not submitted upstream") + public T otherAdapter(AdapterConstructor constructor) { + return constructor.newInstance(transportListener, streamTracerFactories, maxInboundMessageSize, attributes); + } + + /** + * Call this method inside {@link jakarta.servlet.http.HttpServlet#doGet(HttpServletRequest, HttpServletResponse)} + * to serve gRPC GET request. + * + *

+ * This method is currently not implemented. + * + *

+ * Note that in rare case gRPC client sends GET requests. + * + *

+ * Do not modify {@code req} and {@code resp} before or after calling this method. However, calling + * {@code resp.setBufferSize()} before invocation is allowed. + */ + public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + // TODO(zdapeng) + } + + /** + * Call this method inside {@link jakarta.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} + * to serve gRPC POST request. + * + *

+ * Do not modify {@code req} and {@code resp} before or after calling this method. However, calling + * {@code resp.setBufferSize()} before invocation is allowed. + */ + public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation"); + checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request"); + + InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null); + logger.log(FINE, "[{0}] RPC started", logId); + + AsyncContext asyncCtx = req.startAsync(req, resp); + + String method = req.getRequestURI().substring(1); // remove the leading "/" + Metadata headers = getHeaders(req); + + if (logger.isLoggable(FINEST)) { + logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method}); + logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers}); + } + + Long timeoutNanos = headers.get(TIMEOUT_KEY); + if (timeoutNanos == null) { + timeoutNanos = 0L; + } + asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos)); + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext(streamTracerFactories, method, headers); + + ServletServerStream stream = new ServletServerStream( + asyncCtx, + statsTraceCtx, + maxInboundMessageSize, + attributes.toBuilder() + .set( + Grpc.TRANSPORT_ATTR_REMOTE_ADDR, + new InetSocketAddress(req.getRemoteHost(), req.getRemotePort())) + .set( + Grpc.TRANSPORT_ATTR_LOCAL_ADDR, + new InetSocketAddress(req.getLocalAddr(), req.getLocalPort())) + .build(), + getAuthority(req), + logId); + + transportListener.streamCreated(stream, method, headers); + stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated); + + asyncCtx.getRequest().getInputStream() + .setReadListener(new GrpcReadListener(stream, asyncCtx, logId)); + asyncCtx.addListener(new GrpcAsyncListener(stream, logId)); + } + + // This method must use Enumeration and its members, since that is the only way to read headers + // from the servlet api. + @SuppressWarnings("JdkObsolete") + private static Metadata getHeaders(HttpServletRequest req) { + Enumeration headerNames = req.getHeaderNames(); + checkNotNull( + headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()"); + List byteArrays = new ArrayList<>(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + Enumeration values = req.getHeaders(headerName); + if (values == null) { + continue; + } + while (values.hasMoreElements()) { + String value = values.nextElement(); + if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); + byteArrays.add(BaseEncoding.base64().decode(value)); + } else { + byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); + byteArrays.add(value.getBytes(StandardCharsets.US_ASCII)); + } + } + } + return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][] {})); + } + + // This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which + // can only return StringBuffer instances + @SuppressWarnings("JdkObsolete") + private static String getAuthority(HttpServletRequest req) { + try { + return new URI(req.getRequestURL().toString()).getAuthority(); + } catch (URISyntaxException e) { + logger.log(FINE, "Error getting authority from the request URL {0}" + req.getRequestURL()); + return req.getServerName() + ":" + req.getServerPort(); + } + } + + /** + * Call this method when the adapter is no longer needed. The gRPC server will be terminated. + */ + public void destroy() { + transportListener.transportTerminated(); + } + + private static final class GrpcAsyncListener implements AsyncListener { + final InternalLogId logId; + final ServletServerStream stream; + + GrpcAsyncListener(ServletServerStream stream, InternalLogId logId) { + this.stream = stream; + this.logId = logId; + } + + @Override + public void onComplete(AsyncEvent event) {} + + @Override + public void onTimeout(AsyncEvent event) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable()); + } + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM in the end. + if (!event.getAsyncContext().getResponse().isCommitted()) { + stream.cancel(Status.DEADLINE_EXCEEDED); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED)); + } + } + + @Override + public void onError(AsyncEvent event) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable()); + } + + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!event.getAsyncContext().getResponse().isCommitted()) { + stream.cancel(Status.fromThrowable(event.getThrowable())); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState().transportReportStatus( + Status.fromThrowable(event.getThrowable()))); + } + } + + @Override + public void onStartAsync(AsyncEvent event) {} + } + + private static final class GrpcReadListener implements ReadListener { + final ServletServerStream stream; + final AsyncContext asyncCtx; + final ServletInputStream input; + final InternalLogId logId; + + GrpcReadListener( + ServletServerStream stream, + AsyncContext asyncCtx, + InternalLogId logId) throws IOException { + this.stream = stream; + this.asyncCtx = asyncCtx; + input = asyncCtx.getRequest().getInputStream(); + this.logId = logId; + } + + final byte[] buffer = new byte[4 * 1024]; + + @Override + public void onDataAvailable() throws IOException { + logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId); + + while (input.isReady()) { + int length = input.read(buffer); + if (length == -1) { + logger.log(FINEST, "[{0}] inbound data: read end of stream", logId); + return; + } else { + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] inbound data: length = {1}, bytes = {2}", + new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)}); + } + + byte[] copy = Arrays.copyOf(buffer, length); + stream.transportState().runOnTransportThread( + () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false)); + } + } + + logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId); + } + + @Override + public void onAllDataRead() { + logger.log(FINE, "[{0}] onAllDataRead", logId); + stream.transportState().runOnTransportThread( + () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[] {}), true)); + } + + @Override + public void onError(Throwable t) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), t); + } + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!asyncCtx.getResponse().isCommitted()) { + stream.cancel(Status.fromThrowable(t)); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState() + .transportReportStatus(Status.fromThrowable(t))); + } + } + } + + /** + * Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client. + * + * @return true if the request comes from a gRPC client + */ + public static boolean isGrpc(HttpServletRequest request) { + return request.getContentType() != null + && request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC); + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerBuilder.java new file mode 100644 index 00000000000..bc7aa3364f5 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerBuilder.java @@ -0,0 +1,251 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.ExperimentalApi; +import io.grpc.ForwardingServerBuilder; +import io.grpc.Internal; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.InternalLogId; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerStreamTracer; +import io.grpc.ServerStreamTracer.Factory; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ServerImplBuilder; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder; + +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Builder to build a gRPC server that can run as a servlet. This is for advanced custom settings. Normally, users + * should consider extending the out-of-box {@link GrpcServlet} directly instead. + * + *

+ * The API is experimental. The authors would like to know more about the real usecases. Users are welcome to provide + * feedback by commenting on the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +@NotThreadSafe +public final class ServletServerBuilder extends ForwardingServerBuilder { + List streamTracerFactories; + int maxInboundMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + + private final ServerImplBuilder serverImplBuilder; + + private ScheduledExecutorService scheduler; + private boolean internalCaller; + private boolean usingCustomScheduler; + private InternalServerImpl internalServer; + + public ServletServerBuilder() { + serverImplBuilder = new ServerImplBuilder(this::buildTransportServers); + } + + /** + * Builds a gRPC server that can run as a servlet. + * + *

+ * The returned server will not be started or bound to a port. + * + *

+ * Users should not call this method directly. Instead users should call {@link #buildServletAdapter()} which + * internally will call {@code build()} and {@code start()} appropriately. + * + * @throws IllegalStateException if this method is called by users directly + */ + @Override + public Server build() { + checkState(internalCaller, "build() method should not be called directly by an application"); + return super.build(); + } + + /** + * Creates a {@link ServletAdapter}. + */ + public ServletAdapter buildServletAdapter() { + return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize); + } + + private ServerTransportListener buildAndStart() { + try { + internalCaller = true; + build().start(); + } catch (IOException e) { + // actually this should never happen + throw new RuntimeException(e); + } finally { + internalCaller = false; + } + + if (!usingCustomScheduler) { + scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + } + + // Create only one "transport" for all requests because it has no knowledge of which request is + // associated with which client socket. This "transport" does not do socket connection, the + // container does. + ServerTransportImpl serverTransport = + new ServerTransportImpl(scheduler, usingCustomScheduler); + return internalServer.serverListener.transportCreated(serverTransport); + } + + protected InternalServer buildTransportServers( + List streamTracerFactories) { + checkNotNull(streamTracerFactories, "streamTracerFactories"); + this.streamTracerFactories = streamTracerFactories; + internalServer = new InternalServerImpl(); + return internalServer; + } + + @Internal + @Override + protected ServerBuilder delegate() { + return serverImplBuilder; + } + + /** + * Throws {@code UnsupportedOperationException}. TLS should be configured by the servlet container. + */ + @Override + public ServletServerBuilder useTransportSecurity(File certChain, File privateKey) { + throw new UnsupportedOperationException("TLS should be configured by the servlet container"); + } + + @Override + public ServletServerBuilder maxInboundMessageSize(int bytes) { + checkArgument(bytes >= 0, "bytes must be >= 0"); + maxInboundMessageSize = bytes; + return this; + } + + /** + * Provides a custom scheduled executor service to the server builder. + * + * @return this + */ + public ServletServerBuilder scheduledExecutorService(ScheduledExecutorService scheduler) { + this.scheduler = checkNotNull(scheduler, "scheduler"); + usingCustomScheduler = true; + return this; + } + + private static final class InternalServerImpl implements InternalServer { + + ServerListener serverListener; + + InternalServerImpl() {} + + @Override + public void start(ServerListener listener) { + serverListener = listener; + } + + @Override + public void shutdown() { + if (serverListener != null) { + serverListener.serverShutdown(); + } + } + + @Override + public SocketAddress getListenSocketAddress() { + return new SocketAddress() { + @Override + public String toString() { + return "ServletServer"; + } + }; + } + + @Override + public InternalInstrumented getListenSocketStats() { + // sockets are managed by the servlet container, grpc is ignorant of that + return null; + } + + @Override + public List getListenSocketAddresses() { + return Collections.emptyList(); + } + + @Nullable + @Override + public List> getListenSocketStatsList() { + return null; + } + } + + @VisibleForTesting + static final class ServerTransportImpl implements ServerTransport { + + private final InternalLogId logId = InternalLogId.allocate(ServerTransportImpl.class, null); + private final ScheduledExecutorService scheduler; + private final boolean usingCustomScheduler; + + ServerTransportImpl( + ScheduledExecutorService scheduler, boolean usingCustomScheduler) { + this.scheduler = checkNotNull(scheduler, "scheduler"); + this.usingCustomScheduler = usingCustomScheduler; + } + + @Override + public void shutdown() { + if (!usingCustomScheduler) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); + } + } + + @Override + public void shutdownNow(Status reason) { + shutdown(); + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return scheduler; + } + + @Override + public ListenableFuture getStats() { + // does not support instrumentation + return null; + } + + @Override + public InternalLogId getLogId() { + return logId; + } + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerStream.java new file mode 100644 index 00000000000..e8da00e106d --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/ServletServerStream.java @@ -0,0 +1,333 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.grpc.servlet; + +import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC; +import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.WARNING; + +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Attributes; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.internal.AbstractServerStream; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SerializingExecutor; +import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportFrameUtil; +import io.grpc.internal.TransportTracer; +import io.grpc.internal.WritableBuffer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import jakarta.servlet.AsyncContext; +import jakarta.servlet.WriteListener; +import jakarta.servlet.http.HttpServletResponse; + +final class ServletServerStream extends AbstractServerStream { + + private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName()); + + private final ServletTransportState transportState; + private final Sink sink = new Sink(); + private final AsyncContext asyncCtx; + private final HttpServletResponse resp; + private final Attributes attributes; + private final String authority; + private final InternalLogId logId; + private final AsyncServletOutputStreamWriter writer; + + ServletServerStream( + AsyncContext asyncCtx, + StatsTraceContext statsTraceCtx, + int maxInboundMessageSize, + Attributes attributes, + String authority, + InternalLogId logId) throws IOException { + super(ByteArrayWritableBuffer::new, statsTraceCtx); + transportState = + new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer()); + this.attributes = attributes; + this.authority = authority; + this.logId = logId; + this.asyncCtx = asyncCtx; + this.resp = (HttpServletResponse) asyncCtx.getResponse(); + resp.getOutputStream().setWriteListener(new GrpcWriteListener()); + this.writer = new AsyncServletOutputStreamWriter( + asyncCtx, resp.getOutputStream(), transportState, logId); + } + + @Override + protected ServletTransportState transportState() { + return transportState; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public String getAuthority() { + return authority; + } + + @Override + public int streamId() { + return -1; + } + + @Override + protected Sink abstractServerStreamSink() { + return sink; + } + + private void writeHeadersToServletResponse(Metadata metadata) { + // Discard any application supplied duplicates of the reserved headers + metadata.discardAll(CONTENT_TYPE_KEY); + metadata.discardAll(GrpcUtil.TE_HEADER); + metadata.discardAll(GrpcUtil.USER_AGENT_KEY); + + if (logger.isLoggable(FINE)) { + logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata}); + } + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType(CONTENT_TYPE_GRPC); + + byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata); + for (int i = 0; i < serializedHeaders.length; i += 2) { + resp.addHeader( + new String(serializedHeaders[i], StandardCharsets.US_ASCII), + new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII)); + } + } + + final class ServletTransportState extends TransportState { + + private final SerializingExecutor transportThreadExecutor = + new SerializingExecutor(MoreExecutors.directExecutor()); + + private ServletTransportState( + int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); + } + + @Override + public void runOnTransportThread(Runnable r) { + transportThreadExecutor.execute(r); + } + + @Override + public void bytesRead(int numBytes) { + // no-op + // no flow control yet + } + + @Override + public void deframeFailed(Throwable cause) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause); + } + cancel(Status.fromThrowable(cause)); + } + } + + private static final class ByteArrayWritableBuffer implements WritableBuffer { + + private final int capacity; + final byte[] bytes; + private int index; + + ByteArrayWritableBuffer(int capacityHint) { + this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))]; + this.capacity = bytes.length; + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + System.arraycopy(src, srcIndex, bytes, index, length); + index += length; + } + + @Override + public void write(byte b) { + bytes[index++] = b; + } + + @Override + public int writableBytes() { + return capacity - index; + } + + @Override + public int readableBytes() { + return index; + } + + @Override + public void release() {} + } + + private final class GrpcWriteListener implements WriteListener { + + @Override + public void onError(Throwable t) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), t); + } + + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!resp.isCommitted()) { + cancel(Status.fromThrowable(t)); + } else { + transportState.runOnTransportThread( + () -> transportState.transportReportStatus(Status.fromThrowable(t))); + } + } + + @Override + public void onWritePossible() throws IOException { + writer.onWritePossible(); + } + } + + private final class Sink implements AbstractServerStream.Sink { + final TrailerSupplier trailerSupplier = new TrailerSupplier(); + + @Override + public void writeHeaders(Metadata headers) { + writeHeadersToServletResponse(headers); + resp.setTrailerFields(trailerSupplier); + try { + writer.flush(); + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e); + cancel(Status.fromThrowable(e)); + } + } + + @Override + public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) { + if (frame == null && !flush) { + return; + } + + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}", + new Object[] {logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages}); + } + + try { + if (frame != null) { + int numBytes = frame.readableBytes(); + if (numBytes > 0) { + onSendingBytes(numBytes); + } + writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes()); + } + + if (flush) { + writer.flush(); + } + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e); + cancel(Status.fromThrowable(e)); + } + } + + @Override + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { + if (logger.isLoggable(FINE)) { + logger.log( + FINE, + "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}", + new Object[] {logId, trailers, headersSent, status}); + } + if (!headersSent) { + writeHeadersToServletResponse(trailers); + } else { + byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers); + for (int i = 0; i < serializedHeaders.length; i += 2) { + String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII); + String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII); + trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue); + trailerSupplier.get().putIfAbsent(key, newValue); + } + } + + writer.complete(); + } + + @Override + public void cancel(Status status) { + if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) { + return; // let the servlet timeout, the container will sent RST_STREAM automatically + } + transportState.runOnTransportThread(() -> transportState.transportReportStatus(status)); + // There is no way to RST_STREAM with CANCEL code, so write trailers instead + close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata()); + CountDownLatch countDownLatch = new CountDownLatch(1); + transportState.runOnTransportThread(() -> { + asyncCtx.complete(); + countDownLatch.countDown(); + }); + try { + countDownLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class TrailerSupplier implements Supplier> { + final Map trailers = Collections.synchronizedMap(new HashMap<>()); + + TrailerSupplier() {} + + @Override + public Map get() { + return trailers; + } + } + + static String toHexString(byte[] bytes, int length) { + String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64)); + if (length > 80) { + hex += "..."; + } + if (length > 64) { + int offset = max(64, length - 16); + hex += BaseEncoding.base16().encode(bytes, offset, length - offset); + } + return hex; + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/package-info.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/package-info.java new file mode 100644 index 00000000000..859397c2c8d --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +/** + * API that implements gRPC server as a servlet. The API requires that the application container supports Servlet 4.0 + * and enables HTTP/2. + * + *

+ * The API is experimental. The authors would like to know more about the real usecases. Users are welcome to provide + * feedback by commenting on the tracking issue. + */ +@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +package io.grpc.servlet; diff --git a/grpc-java/grpc-servlet-websocket-jakarta/build.gradle b/grpc-java/grpc-servlet-websocket-jakarta/build.gradle new file mode 100644 index 00000000000..f13dbec2507 --- /dev/null +++ b/grpc-java/grpc-servlet-websocket-jakarta/build.gradle @@ -0,0 +1,16 @@ +plugins { + id 'java-library' +} + +dependencies { + Classpaths.inheritGrpcPlatform(project) + implementation 'io.grpc:grpc-core' +// compileOnly 'javax.servlet:javax.servlet-api:4.0.1' + implementation 'jakarta.servlet:jakarta.servlet-api:5.0.0' + +// compileOnly 'javax.websocket:javax.websocket-api:1.1' + implementation 'jakarta.websocket:jakarta.websocket-api:2.0.0' + + implementation 'com.google.guava:guava:19.0' + +} \ No newline at end of file diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebSocketServerStream.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebSocketServerStream.java new file mode 100644 index 00000000000..b4dfa03ed64 --- /dev/null +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebSocketServerStream.java @@ -0,0 +1,220 @@ +package io.grpc.servlet.web.websocket; + +import io.grpc.Attributes; +import io.grpc.InternalLogId; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.internal.ReadableBuffers; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.StatsTraceContext; +import jakarta.websocket.CloseReason; +import jakarta.websocket.EndpointConfig; +import jakarta.websocket.OnClose; +import jakarta.websocket.OnError; +import jakarta.websocket.OnMessage; +import jakarta.websocket.OnOpen; +import jakarta.websocket.Session; +import jakarta.websocket.server.ServerEndpoint; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; + +/** + * Each instance of this type represents a single active websocket, which maps to a single gRPC stream. + * + * JSR356 websockets always handle their incoming messages in a serial manner, so we don't need to worry here about + * runOnTransportThread while in onMessage, as we're already in the transport thread. + */ +@ServerEndpoint(value = "/{service}/{method}", subprotocols = "grpc-websockets") +public class WebSocketServerStream { + private final ServerTransportListener transportListener; + private final List streamTracerFactories; + private final int maxInboundMessageSize; + private final Attributes attributes; + + private final InternalLogId logId = InternalLogId.allocate(WebSocketServerStream.class, null); + + // assigned on open, always available + private Session websocketSession; + + // fields set after headers are decoded + private WebsocketStreamImpl stream; + private boolean headersProcessed = false; + private final boolean isTextRequest = false;// not supported yet + + public WebSocketServerStream(ServerTransportListener transportListener, + List streamTracerFactories, int maxInboundMessageSize, + Attributes attributes) { + this.transportListener = transportListener; + this.streamTracerFactories = streamTracerFactories; + this.maxInboundMessageSize = maxInboundMessageSize; + this.attributes = attributes; + } + + @OnOpen + public void onOpen(Session websocketSession, EndpointConfig config) { + this.websocketSession = websocketSession; + + // Configure defaults present in some servlet containers to avoid some confusing limits. Subclasses + // can override this method to control those defaults on their own. + websocketSession.setMaxIdleTimeout(0); + websocketSession.setMaxBinaryMessageBufferSize(Integer.MAX_VALUE); + } + + @OnMessage + public void onMessage(String message) { + if (stream != null) { + // This means the stream opened correctly, then sent a text payload, which doesn't make sense. + // End the stream first. + stream.transportReportStatus(Status.fromCode(Status.Code.UNKNOWN)); + } + try { + websocketSession + .close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Can't read string payloads")); + } catch (IOException ignored) { + // ignoring failure + } + } + + @OnMessage + public void onMessage(ByteBuffer message) throws IOException { + if (message.remaining() == 0) { + // message is empty (no control flow, no data), error + if (stream != null) { + stream.transportReportStatus(Status.fromCode(Status.Code.UNKNOWN)); + } + websocketSession.close(new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Unexpected empty message")); + return; + } + + // if this is the first message on this websocket, it is the request headers + if (!headersProcessed) { + processHeaders(message); + headersProcessed = true; + return; + } + + // For every message after headers, the first byte is control flow + byte controlFlow = message.get(); + if (controlFlow == 1) { + // if first byte is 1, the client is finished sending + if (message.remaining() != 0) { + stream.transportReportStatus(Status.fromCode(Status.Code.UNKNOWN)); + websocketSession.close( + new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Unexpected bytes in close message")); + return; + } + stream.inboundDataReceived(ReadableBuffers.empty(), true); + return; + } + + if (isTextRequest) { + throw new UnsupportedOperationException("text requests not yet supported"); + } + + // Having already stripped the control flow byte, the rest of the payload is our request message + stream.inboundDataReceived(ReadableBuffers.wrap(message), false); + } + + @OnError + public void onError(Throwable error) { + stream.transportReportStatus(Status.UNKNOWN);// transport failure of some kind + // onClose will be called automatically + if (error instanceof ClosedChannelException) { + // ignore this for now + // TODO need to understand why this is happening + } else { + error.printStackTrace(); + } + } + + @OnClose + public void onClose(CloseReason closeReason) { + stream.transportReportStatus(Status.CANCELLED);// remote end hung up + } + + private String methodName() { + return websocketSession.getRequestURI().getPath().substring(1); + } + + private void processHeaders(ByteBuffer headerPayload) { + Metadata headers = readHeaders(headerPayload); + + Long timeoutNanos = headers.get(TIMEOUT_KEY); + if (timeoutNanos == null) { + timeoutNanos = 0L; + } + // TODO handle timeout + + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext(streamTracerFactories, methodName(), headers); + + stream = new WebsocketStreamImpl(statsTraceCtx, maxInboundMessageSize, websocketSession, logId, + attributes); + stream.createStream(transportListener, methodName(), headers); + } + + private static Metadata readHeaders(ByteBuffer headerPayload) { + // Headers are passed as ascii (browsers don't support binary), ":"-separated key/value pairs, separated on + // "\r\n". The client implementation shows that values might be comma-separated, but we'll pass that through + // directly as a plain string. + // + List byteArrays = new ArrayList<>(); + while (headerPayload.hasRemaining()) { + int nameStart = headerPayload.position(); + while (headerPayload.hasRemaining() && headerPayload.get() != ':'); + int nameEnd = headerPayload.position() - 1; + int valueStart = headerPayload.position() + 1;// assumes that the colon is followed by a space + + while (headerPayload.hasRemaining() && headerPayload.get() != '\n'); + int valueEnd = headerPayload.position() - 2;// assumes that \n is preceded by a \r, this isnt generally + // safe? + if (valueEnd < valueStart) { + valueEnd = valueStart; + } + int endOfLinePosition = headerPayload.position(); + + byte[] headerBytes = new byte[nameEnd - nameStart]; + headerPayload.position(nameStart); + headerPayload.get(headerBytes); + + byteArrays.add(headerBytes); + if (Arrays.equals(headerBytes, "content-type".getBytes(StandardCharsets.US_ASCII))) { + // rewrite grpc-web content type to matching grpc content type + byteArrays.add("grpc+proto".getBytes(StandardCharsets.US_ASCII)); + // TODO support other formats like text, non-proto + headerPayload.position(valueEnd); + continue; + } + + // TODO check for binary header suffix + // if (headerBytes.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + // + // } else { + byte[] valueBytes = new byte[valueEnd - valueStart]; + headerPayload.position(valueStart); + headerPayload.get(valueBytes); + byteArrays.add(valueBytes); + // } + + headerPayload.position(endOfLinePosition); + } + + // add a te:trailers, as gRPC will expect it + byteArrays.add("te".getBytes(StandardCharsets.US_ASCII)); + byteArrays.add("trailers".getBytes(StandardCharsets.US_ASCII)); + + // TODO to support text encoding + + return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][] {})); + } +} diff --git a/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java new file mode 100644 index 00000000000..94897b815ea --- /dev/null +++ b/grpc-java/grpc-servlet-websocket-jakarta/src/main/java/io/grpc/servlet/web/websocket/WebsocketStreamImpl.java @@ -0,0 +1,283 @@ +package io.grpc.servlet.web.websocket; + +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Attributes; +import io.grpc.InternalLogId; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.internal.AbstractServerStream; +import io.grpc.internal.ReadableBuffer; +import io.grpc.internal.SerializingExecutor; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; +import io.grpc.internal.WritableBuffer; +import jakarta.websocket.Session; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.lang.Math.max; +import static java.lang.Math.min; + +public class WebsocketStreamImpl extends AbstractServerStream { + private static final Logger logger = Logger.getLogger(WebsocketStreamImpl.class.getName()); + + public final class WebsocketTransportState extends TransportState { + + private final SerializingExecutor transportThreadExecutor = + new SerializingExecutor(MoreExecutors.directExecutor()); + + protected WebsocketTransportState(int maxMessageSize, StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); + } + + @Override + public void runOnTransportThread(Runnable r) { + transportThreadExecutor.execute(r); + } + + @Override + public void bytesRead(int numBytes) { + // no-op, no flow-control yet + } + + @Override + public void deframeFailed(Throwable cause) { + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, String.format("[{%s}] Exception processing message", logId), cause); + } + cancel(Status.fromThrowable(cause)); + } + } + private static final class ByteArrayWritableBuffer implements WritableBuffer { + + private final int capacity; + final byte[] bytes; + private int index; + + ByteArrayWritableBuffer(int capacityHint) { + this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))]; + this.capacity = bytes.length; + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + System.arraycopy(src, srcIndex, bytes, index, length); + index += length; + } + + @Override + public void write(byte b) { + bytes[index++] = b; + } + + @Override + public int writableBytes() { + return capacity - index; + } + + @Override + public int readableBytes() { + return index; + } + + @Override + public void release() {} + } + + private final WebsocketTransportState transportState; + private final Sink sink = new Sink(); + private final Session websocketSession; + private final InternalLogId logId; + private final Attributes attributes; + + public WebsocketStreamImpl(StatsTraceContext statsTraceCtx, int maxInboundMessageSize, Session websocketSession, + InternalLogId logId, Attributes attributes) { + super(ByteArrayWritableBuffer::new, statsTraceCtx); + this.websocketSession = websocketSession; + this.logId = logId; + this.attributes = attributes; + transportState = new WebsocketTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer()); + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + public void createStream(ServerTransportListener transportListener, String methodName, Metadata headers) { + transportListener.streamCreated(this, methodName, headers); + transportState().onStreamAllocated(); + } + + public void inboundDataReceived(ReadableBuffer message, boolean endOfStream) { + transportState().inboundDataReceived(message, endOfStream); + } + + public void transportReportStatus(Status status) { + transportState().transportReportStatus(status); + } + + public void complete() { + transportState().complete(); + } + + @Override + public TransportState transportState() { + return transportState; + } + + @Override + protected Sink abstractServerStreamSink() { + return sink; + } + + @Override + public int streamId() { + return -1; + } + + private final class Sink implements AbstractServerStream.Sink { + + @Override + public void writeHeaders(Metadata headers) { + // headers/trailers are always sent as asci, colon-delimited pairs, with \r\n separating them. The + // trailer response must be prefixed with 0x80 (0r 0x81 if compressed), followed by the length of the + // message + + byte[][] serializedHeaders = InternalMetadata.serialize(headers); + // Total up the size of the payload: 5 bytes for the prefix, and each header needs a colon delimiter, and to + // end with \r\n + int headerLength = Arrays.stream(serializedHeaders).mapToInt(arr -> arr.length + 2).sum(); + ByteBuffer prefix = ByteBuffer.allocate(5); + prefix.put((byte) 0x80); + prefix.putInt(headerLength); + prefix.flip(); + ByteBuffer message = ByteBuffer.allocate(headerLength); + for (int i = 0; i < serializedHeaders.length; i += 2) { + message.put(serializedHeaders[i]); + message.put((byte) ':'); + message.put((byte) ' '); + message.put(serializedHeaders[i + 1]); + message.put((byte) '\r'); + message.put((byte) '\n'); + } + message.flip(); + try { + // send in two separate payloads + websocketSession.getBasicRemote().sendBinary(prefix); + websocketSession.getBasicRemote().sendBinary(message); + } catch (IOException e) { + throw Status.fromThrowable(e).asRuntimeException(); + } + } + + @Override + public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) { + if (frame == null && !flush) { + return; + } + + if (logger.isLoggable(Level.FINEST)) { + logger.log( + Level.FINEST, + "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}", + new Object[] {logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages}); + } + + try { + if (frame != null) { + int numBytes = frame.readableBytes(); + if (numBytes > 0) { + onSendingBytes(numBytes); + } + + ByteBuffer payload = + ByteBuffer.wrap(((ByteArrayWritableBuffer) frame).bytes, 0, frame.readableBytes()); + + websocketSession.getBasicRemote().sendBinary(payload); + } + + } catch (IOException e) { + // TODO log this off-thread, doing logging about an error from sending a log to a client is a mess + // logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e); + cancel(Status.fromThrowable(e)); + } + } + + @Override + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { + if (logger.isLoggable(Level.FINE)) { + logger.log( + Level.FINE, + "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}", + new Object[] {logId, trailers, headersSent, status}); + } + + // Trailers are always sent as asci, colon-delimited pairs, with \r\n separating them. The + // trailer response must be prefixed with 0x80 (0r 0x81 if compressed), followed by the size in a 4 byte int + + byte[][] serializedTrailers = InternalMetadata.serialize(trailers); + // Total up the size of the payload: 5 bytes for the prefix, and each trailer needs a colon+space delimiter, + // and to end with \r\n + int trailerLength = Arrays.stream(serializedTrailers).mapToInt(arr -> arr.length + 2).sum(); + ByteBuffer prefix = ByteBuffer.allocate(5); + prefix.put((byte) 0x80); + prefix.putInt(trailerLength); + prefix.flip(); + ByteBuffer message = ByteBuffer.allocate(trailerLength); + for (int i = 0; i < serializedTrailers.length; i += 2) { + message.put(serializedTrailers[i]); + message.put((byte) ':'); + message.put((byte) ' '); + message.put(serializedTrailers[i + 1]); + message.put((byte) '\r'); + message.put((byte) '\n'); + } + message.flip(); + try { + // send in two separate messages + websocketSession.getBasicRemote().sendBinary(prefix); + websocketSession.getBasicRemote().sendBinary(message); + + websocketSession.close(); + } catch (IOException e) { + throw Status.fromThrowable(e).asRuntimeException(); + } + } + + @Override + public void cancel(Status status) { + if (!websocketSession.isOpen() && Status.Code.DEADLINE_EXCEEDED == status.getCode()) { + return; // let the servlet timeout, the container will sent RST_STREAM automatically + } + transportState.runOnTransportThread(() -> transportState.transportReportStatus(status)); + // There is no way to RST_STREAM with CANCEL code, so write trailers instead + close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata()); + CountDownLatch countDownLatch = new CountDownLatch(1); + transportState.runOnTransportThread(() -> { + try { + websocketSession.close(); + } catch (IOException ioException) { + // already closing, ignore + } + countDownLatch.countDown(); + }); + try { + countDownLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/settings.gradle b/settings.gradle index 40208d3bf64..7bc0cdae7b1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -53,6 +53,9 @@ pyMods.each { project(":$name").projectDir = file(dir) } +include 'grpc-java:grpc-servlet-jakarta' +include 'grpc-java:grpc-servlet-websocket-jakarta' + include 'fishconfig-local' include 'DHProcess' include 'proto:proto-backplane-grpc'