Skip to content

Commit

Permalink
Fixes #12171 - QoSHandler does not resume on a virtual thread. (#12174)
Browse files Browse the repository at this point in the history
Now QoSHandler resumes requests using Request.getComponents().getExecutor().
This Executor is configured to be the virtual thread executor, if present, otherwise the Server Executor.

Removed warn() from VirtualThreads.isVirtualThread(), as it was too verbose.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Aug 27, 2024
1 parent 394bc13 commit 0420e92
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,55 @@

package org.eclipse.jetty.server;

import java.util.concurrent.Executor;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;

/**
* Common components made available via a {@link Request}
* Common components made available via a {@link Request}.
*/
public interface Components
{
/**
* @return the {@link ByteBufferPool} associated with the {@link Request}
*/
ByteBufferPool getByteBufferPool();

/**
* @return the {@link Scheduler} associated with the {@link Request}
*/
Scheduler getScheduler();

/**
* @return the {@link ThreadPool} associated with the {@link Request}
* @deprecated use {@link #getExecutor()} instead
*/
@Deprecated(since = "12.0.13", forRemoval = true)
ThreadPool getThreadPool();

/**
* A Map which can be used as a cache for object (e.g. Cookie cache).
* The cache will have a life cycle limited by the connection, i.e. no cache map will live
* @return the {@link Executor} associated with the {@link Request}
*/
default Executor getExecutor()
{
return getThreadPool();
}

/**
* <p>A map-like object that can be used as a cache (for example, as a cookie cache).</p>
* <p>The cache will have a life cycle limited by the connection, i.e. no cache map will live
* longer that the connection associated with it. However, a cache may have a shorter life
* than a connection (e.g. it may be discarded for implementation reasons). A cache map is
* guaranteed to be given to only a single request concurrently (scoped by
* {@link org.eclipse.jetty.server.internal.HttpChannelState}), so objects saved there do not
* need to be made safe from access by simultaneous request.
* If the connection is known to be none-persistent then the cache may be a noop
* cache and discard all items set on it.
* If the connection is known to be non-persistent then the cache may be a noop
* cache and discard all items set on it.</p>
*
* @return A Map, which may be an empty map that discards all items.
* @return A map-like object, which may be an empty implementation that discards all items.
*/
Attributes getCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,18 @@ private boolean resumeSuspended()
if (LOG.isDebugEnabled())
LOG.debug("{} resuming {}", this, entry.request);
// Always dispatch to avoid StackOverflowError.
getServer().getThreadPool().execute(entry);
execute(entry.request, entry);
return true;
}
}
return false;
}

private void execute(Request request, Runnable task)
{
request.getComponents().getExecutor().execute(task);
}

private class Entry implements CyclicTimeouts.Expirable, Runnable
{
private final Request request;
Expand Down Expand Up @@ -458,7 +463,7 @@ private void expire()
}

if (removed)
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
execute(request, () -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler;
Expand Down Expand Up @@ -231,7 +233,18 @@ public Scheduler getScheduler()
@Override
public ThreadPool getThreadPool()
{
return getServer().getThreadPool();
Executor executor = getExecutor();
if (executor instanceof ThreadPool threadPool)
return threadPool;
return new ThreadPoolWrapper(executor);
}

@Override
public Executor getExecutor()
{
Executor executor = getServer().getThreadPool();
Executor virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(executor);
return virtualExecutor != null ? virtualExecutor : executor;
}

@Override
Expand Down Expand Up @@ -1948,4 +1961,43 @@ private static void failed(Callback callback, Throwable failure)
throw t;
}
}

private static class ThreadPoolWrapper implements ThreadPool
{
private final Executor _executor;

private ThreadPoolWrapper(Executor executor)
{
_executor = executor;
}

@Override
public void execute(Runnable command)
{
_executor.execute(command);
}

@Override
public void join()
{
}

@Override
public int getThreads()
{
return 0;
}

@Override
public int getIdleThreads()
{
return 0;
}

@Override
public boolean isLowOnThreads()
{
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Runnable onRequest(MetaData.Request request)
{
Runnable after = _afterHandle.getAndSet(null);
if (after != null)
getThreadPool().execute(after);
getExecutor().execute(after);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package org.eclipse.jetty.server.handler;

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,10 +31,15 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -50,7 +58,8 @@ public class QoSHandlerTest

private void start(QoSHandler qosHandler) throws Exception
{
server = new Server();
if (server == null)
server = new Server();
connector = new LocalConnector(server);
server.addConnector(connector);
server.setHandler(qosHandler);
Expand Down Expand Up @@ -483,4 +492,70 @@ public boolean handle(Request request, Response response, Callback callback)
}
});
}

@Test
@DisabledForJreRange(max = JRE.JAVA_20)
public void testRequestInVirtualThreadIsResumedInVirtualThread() throws Exception
{
QoSHandler qosHandler = new QoSHandler();
qosHandler.setMaxRequestCount(1);
List<Callback> callbacks = new ArrayList<>();
qosHandler.setHandler(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
response.setStatus(VirtualThreads.isVirtualThread() ? HttpStatus.OK_200 : HttpStatus.NOT_ACCEPTABLE_406);
// Save the callback but do not succeed it yet.
callbacks.add(callback);
return true;
}
});
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("st");
serverThreads.setVirtualThreadsExecutor(VirtualThreads.getNamedVirtualThreadsExecutor("vst"));
server = new Server(serverThreads);
ServerConnector networkConnector = new ServerConnector(server, 1, 1);
server.addConnector(networkConnector);
start(qosHandler);

// Send the first request that will not be completed yet.
try (SocketChannel client1 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client1.write(StandardCharsets.UTF_8.encode("""
GET /first HTTP/1.1
Host: localhost
"""));
// Wait that the request arrives at the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));

// Send the second request, it should be suspended by QoSHandler.
try (SocketChannel client2 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort())))
{
client2.write(StandardCharsets.UTF_8.encode("""
GET /second HTTP/1.1
Host: localhost
"""));
// Wait for the second request to be suspended.
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1));

// Finish the first request, so that the second can be resumed.
callbacks.remove(0).succeeded();
client1.socket().setSoTimeout(5000);
HttpTester.Response response1 = HttpTester.parseResponse(client1);
assertEquals(HttpStatus.OK_200, response1.getStatus());

// Wait for the second request to arrive to the server.
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));

// Finish the second request.
callbacks.remove(0).succeeded();
client2.socket().setSoTimeout(5000);
HttpTester.Response response2 = HttpTester.parseResponse(client2);
assertEquals(HttpStatus.OK_200, response2.getStatus());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public static boolean isVirtualThread()
}
catch (Throwable x)
{
warn();
return false;
}
}
Expand Down

0 comments on commit 0420e92

Please sign in to comment.