Skip to content

Commit

Permalink
Issue #12272 - Potential deadlock with Vaadin.
Browse files Browse the repository at this point in the history
Fixed the case where a GOAWAY followed by a TCP FIN was causing a race between closing the `EndPoint` and running the failure `Runnable` task.

The TCP FIN after the GOAWAY causes the streams to be failed on the server;
in turn, failing the streams generates failure `Runnable` tasks that are submitted to the HTTP/2 execution strategy;
however, the streams were destroyed before the failure `Runnable` tasks actually ran, so the `EndPoint` was closed;
closing the `EndPoint` would close the `HTTP2Connection`, which in turn would stop the execution strategy;
this lead to the fact that the failure `Runnable` tasks were never run.

Now, the failure `Runnable` tasks are invoked via `ThreadPool.executeImmediately()` rather than being submitted to the execution strategy.
This ensures that they would be run and not queued, even in case of lack of threads, so that they could unblock blocked reads or writes, freeing up blocked threads.

Additionally, improved `HTTP2Stream.onFailure()` to destroy the stream only after the failure tasks have completed.

Smaller other fixes to improve the code.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 12, 2024
1 parent 6aaaa15 commit dde369a
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void onClose(Throwable cause)
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Close {} ", this);
super.onClose(cause);

LifeCycle.stop(strategy);
}

Expand Down Expand Up @@ -375,7 +374,8 @@ else if (filled == 0)
{
shutdown = true;
session.onShutdown();
return null;
// The onShutDown() call above may have produced a task.
return pollTask();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,13 +615,17 @@ private void onFailure(FailureFrame frame, Callback callback)
failure = frame.getFailure();
flowControlLength = drain();
}
close();
boolean removed = session.removeStream(this);
session.dataConsumed(this, flowControlLength);
if (removed)
notifyFailure(this, frame, callback);
else
callback.succeeded();
close();

notifyFailure(this, frame, new Nested(callback)
{
@Override
public void completed()
{
session.removeStream(HTTP2Stream.this);
}
});
}

private int drain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -202,12 +203,9 @@ public void onStreamFailure(Stream stream, Throwable failure, Callback callback)
if (channel != null)
{
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
}
// The task may unblock a blocked read or write, so it cannot be
// queued, because there may be no threads available to run it.
ThreadPool.executeImmediately(getExecutor(), task);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,32 @@ public Runnable onFailure(Throwable failure, Callback callback)
{
boolean remote = failure instanceof EOFException;
Runnable runnable = remote ? _httpChannel.onRemoteFailure(new EofException(failure)) : _httpChannel.onFailure(failure);
return () ->

class FailureTask implements Runnable
{
if (runnable != null)
runnable.run();
callback.succeeded();
};
@Override
public void run()
{
try
{
if (runnable != null)
runnable.run();
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}

@Override
public String toString()
{
return "%s[%s]".formatted(getClass().getSimpleName(), runnable);
}
}

return new FailureTask();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ else if (filled == -1)
}
catch (IOException e)
{
LOG.debug("Unable to shutdown output", e);
if (LOG.isDebugEnabled())
LOG.debug("Unable to shutdown input", e);
shutdownInput();
filled = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -33,23 +35,29 @@
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -64,15 +72,20 @@ public class Http2AsyncIOServletTest

private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
server.addConnector(connector);
ServletContextHandler servletContextHandler = new ServletContextHandler("/");
servletContextHandler.addServlet(new ServletHolder(httpServlet), "/*");
server.setHandler(servletContextHandler);
server.start();

QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HTTP2Client();
client.setExecutor(clientThreads);
client.start();
}

Expand Down Expand Up @@ -218,4 +231,206 @@ public void onStartAsync(AsyncEvent event)

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestThenReset(boolean useReaderWriter) throws Exception
{
// Disable output aggregation for Servlets, so each byte is echoed back.
httpConfig.setOutputAggregationSize(0);
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
dataList.offer(data);
if (data.frame().isEndStream())
return;
}
}
}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());

// Initiates graceful close, waits for the streams to finish as per specification.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);

// Finish the pending stream, either by resetting or sending the last frame.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

// The server should see the effects of the reset.
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());

// Cleanup.
dataList.forEach(Stream.Data::release);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestServerIdleTimeout(boolean useReaderWriter) throws Exception
{
// Disable output aggregation for Servlets, so each byte is echoed back.
httpConfig.setOutputAggregationSize(0);
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
dataList.offer(data);
if (data.frame().isEndStream())
return;
}
}
}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());

// Initiates graceful close, waits for the streams to finish as per specification.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);

// Do not finish the streams, the server must idle timeout.
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());

// Cleanup.
dataList.forEach(Stream.Data::release);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestThenClientDisconnectThenServerIdleTimeout(boolean useReaderWriter) throws Exception
{
AtomicReference<Thread> serverThreadRef = new AtomicReference<>();
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
serverThreadRef.set(Thread.currentThread());
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() {}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread serverThread = serverThreadRef.get();
return serverThread != null && serverThread.getState() == Thread.State.WAITING;
});

// Initiates graceful close, then immediately disconnect.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.from(session::disconnect));

// Do not finish the streams, the server must idle timeout.
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());
}
}

0 comments on commit dde369a

Please sign in to comment.