From 713a3085e863148bb108423ef92937a70b9ee28a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 8 Nov 2024 19:10:00 +0100 Subject: [PATCH] Issue #12272 - Potential deadlock with Vaadin. Fixed the case where a GOAWAY followed by a TCP FIN was causing a race between closing the `EndPoint` and running the failure `Runnable` task. The TCP FIN after the GOAWAY causes the streams to be failed on the server; in turn, failing the streams generates failure `Runnable` tasks that are submitted to the HTTP/2 execution strategy; however, the streams were destroyed before the failure `Runnable` tasks actually ran, so the `EndPoint` was closed; closing the `EndPoint` would close the `HTTP2Connection`, which in turn would stop the execution strategy; this lead to the fact that the failure `Runnable` tasks were never run. Now, the failure `Runnable` tasks are invoked via `ThreadPool.executeImmediately()` rather than being submitted to the execution strategy. This ensures that they would be run and not queued, even in case of lack of threads, so that they could unblock blocked reads or writes, freeing up blocked threads. Additionally, improved `HTTP2Stream.onFailure()` to destroy the stream only after the failure tasks have completed. Smaller other fixes to improve the code. Signed-off-by: Simone Bordet --- .../eclipse/jetty/http2/HTTP2Connection.java | 4 +- .../org/eclipse/jetty/http2/HTTP2Stream.java | 16 +- .../internal/HTTP2ServerConnection.java | 10 +- .../server/internal/HttpStreamOverHTTP2.java | 30 ++- .../jetty/io/SocketChannelEndPoint.java | 3 +- .../transport/Http2AsyncIOServletTest.java | 217 +++++++++++++++++- 6 files changed, 259 insertions(+), 21 deletions(-) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 9098dc3522b8..b13d140c34d0 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -145,7 +145,6 @@ public void onClose(Throwable cause) if (LOG.isDebugEnabled()) LOG.debug("HTTP2 Close {} ", this); super.onClose(cause); - LifeCycle.stop(strategy); } @@ -375,7 +374,8 @@ else if (filled == 0) { shutdown = true; session.onShutdown(); - return null; + // The onShutDown() call above may have produced a task. + return pollTask(); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 9024805eba1c..b261a81cec16 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -615,13 +615,17 @@ private void onFailure(FailureFrame frame, Callback callback) failure = frame.getFailure(); flowControlLength = drain(); } - close(); - boolean removed = session.removeStream(this); session.dataConsumed(this, flowControlLength); - if (removed) - notifyFailure(this, frame, callback); - else - callback.succeeded(); + close(); + + notifyFailure(this, frame, new Nested(callback) + { + @Override + public void completed() + { + session.removeStream(HTTP2Stream.this); + } + }); } private int drain() diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java index 7a2a9299636b..51df1eca8545 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java @@ -54,6 +54,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,12 +203,9 @@ public void onStreamFailure(Stream stream, Throwable failure, Callback callback) if (channel != null) { Runnable task = channel.onFailure(failure, callback); - if (task != null) - { - // We must dispatch to another thread because the task - // may call application code that performs blocking I/O. - offerTask(task, true); - } + // The task may unblock a blocked read or write, so it cannot be + // queued, because there may be no threads available to run it. + ThreadPool.executeImmediately(getExecutor(), task); } else { diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java index f03e9479843f..fd28ee93656d 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java @@ -591,12 +591,32 @@ public Runnable onFailure(Throwable failure, Callback callback) { boolean remote = failure instanceof EOFException; Runnable runnable = remote ? _httpChannel.onRemoteFailure(new EofException(failure)) : _httpChannel.onFailure(failure); - return () -> + + class FailureTask implements Runnable { - if (runnable != null) - runnable.run(); - callback.succeeded(); - }; + @Override + public void run() + { + try + { + if (runnable != null) + runnable.run(); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + + @Override + public String toString() + { + return "%s[%s]".formatted(getClass().getSimpleName(), runnable); + } + } + + return new FailureTask(); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java index 64cec6d478ea..0218bcb25fcf 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java @@ -89,7 +89,8 @@ else if (filled == -1) } catch (IOException e) { - LOG.debug("Unable to shutdown output", e); + if (LOG.isDebugEnabled()) + LOG.debug("Unable to shutdown input", e); shutdownInput(); filled = -1; } diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java index 1919fad72b37..f5a218f389e8 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java @@ -15,6 +15,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -33,10 +35,13 @@ import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; @@ -44,12 +49,15 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; @@ -64,7 +72,9 @@ public class Http2AsyncIOServletTest private void start(HttpServlet httpServlet) throws Exception { - server = new Server(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig)); server.addConnector(connector); ServletContextHandler servletContextHandler = new ServletContextHandler("/"); @@ -72,7 +82,10 @@ private void start(HttpServlet httpServlet) throws Exception server.setHandler(servletContextHandler); server.start(); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); client = new HTTP2Client(); + client.setExecutor(clientThreads); client.start(); } @@ -218,4 +231,206 @@ public void onStartAsync(AsyncEvent event) assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSessionCloseWithPendingRequestThenReset(boolean useReaderWriter) throws Exception + { + // Disable output aggregation for Servlets, so each byte is echoed back. + httpConfig.setOutputAggregationSize(0); + CountDownLatch serverFailureLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + if (useReaderWriter) + request.getReader().transferTo(response.getWriter()); + else + request.getInputStream().transferTo(response.getOutputStream()); + } + catch (Throwable x) + { + serverFailureLatch.countDown(); + throw x; + } + } + }); + + HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + Queue dataList = new ConcurrentLinkedQueue<>(); + MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + while (true) + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(); + return; + } + dataList.offer(data); + if (data.frame().isEndStream()) + return; + } + } + }).get(5, TimeUnit.SECONDS); + + stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false)) + .get(5, TimeUnit.SECONDS); + stream.demand(); + + await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty()); + + // Initiates graceful close, waits for the streams to finish as per specification. + session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP); + + // Finish the pending stream, either by resetting or sending the last frame. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + + // The server should see the effects of the reset. + assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS)); + // The session must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED); + // The endPoint must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen()); + + // Cleanup. + dataList.forEach(Stream.Data::release); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSessionCloseWithPendingRequestServerIdleTimeout(boolean useReaderWriter) throws Exception + { + // Disable output aggregation for Servlets, so each byte is echoed back. + httpConfig.setOutputAggregationSize(0); + CountDownLatch serverFailureLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + if (useReaderWriter) + request.getReader().transferTo(response.getWriter()); + else + request.getInputStream().transferTo(response.getOutputStream()); + } + catch (Throwable x) + { + serverFailureLatch.countDown(); + throw x; + } + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + Queue dataList = new ConcurrentLinkedQueue<>(); + MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + while (true) + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(); + return; + } + dataList.offer(data); + if (data.frame().isEndStream()) + return; + } + } + }).get(5, TimeUnit.SECONDS); + + stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false)) + .get(5, TimeUnit.SECONDS); + stream.demand(); + + await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty()); + + // Initiates graceful close, waits for the streams to finish as per specification. + session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP); + + // Do not finish the streams, the server must idle timeout. + assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS)); + // The session must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED); + // The endPoint must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen()); + + // Cleanup. + dataList.forEach(Stream.Data::release); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testSessionCloseWithPendingRequestThenClientDisconnectThenServerIdleTimeout(boolean useReaderWriter) throws Exception + { + AtomicReference serverThreadRef = new AtomicReference<>(); + CountDownLatch serverFailureLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + serverThreadRef.set(Thread.currentThread()); + if (useReaderWriter) + request.getReader().transferTo(response.getWriter()); + else + request.getInputStream().transferTo(response.getOutputStream()); + } + catch (Throwable x) + { + serverFailureLatch.countDown(); + throw x; + } + } + }); + long idleTimeout = 1000; + connector.setIdleTimeout(idleTimeout); + + HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false)) + .get(5, TimeUnit.SECONDS); + stream.demand(); + + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread serverThread = serverThreadRef.get(); + return serverThread != null && serverThread.getState() == Thread.State.WAITING; + }); + + // Initiates graceful close, then immediately disconnect. + session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.from(session::disconnect)); + + // Do not finish the streams, the server must idle timeout. + assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS)); + // The session must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED); + // The endPoint must eventually be closed. + await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen()); + } }