diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java index b6e32ea937fd..7151c38fdbce 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java @@ -13,34 +13,55 @@ package org.eclipse.jetty.server; +import java.util.concurrent.Executor; + import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.ThreadPool; /** - * Common components made available via a {@link Request} + * Common components made available via a {@link Request}. */ public interface Components { + /** + * @return the {@link ByteBufferPool} associated with the {@link Request} + */ ByteBufferPool getByteBufferPool(); + /** + * @return the {@link Scheduler} associated with the {@link Request} + */ Scheduler getScheduler(); + /** + * @return the {@link ThreadPool} associated with the {@link Request} + * @deprecated use {@link #getExecutor()} instead + */ + @Deprecated(since = "12.0.13", forRemoval = true) ThreadPool getThreadPool(); /** - * A Map which can be used as a cache for object (e.g. Cookie cache). - * The cache will have a life cycle limited by the connection, i.e. no cache map will live + * @return the {@link Executor} associated with the {@link Request} + */ + default Executor getExecutor() + { + return getThreadPool(); + } + + /** + *
A map-like object that can be used as a cache (for example, as a cookie cache).
+ *The cache will have a life cycle limited by the connection, i.e. no cache map will live * longer that the connection associated with it. However, a cache may have a shorter life * than a connection (e.g. it may be discarded for implementation reasons). A cache map is * guaranteed to be given to only a single request concurrently (scoped by * {@link org.eclipse.jetty.server.internal.HttpChannelState}), so objects saved there do not * need to be made safe from access by simultaneous request. - * If the connection is known to be none-persistent then the cache may be a noop - * cache and discard all items set on it. + * If the connection is known to be non-persistent then the cache may be a noop + * cache and discard all items set on it.
* - * @return A Map, which may be an empty map that discards all items. + * @return A map-like object, which may be an empty implementation that discards all items. */ Attributes getCache(); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java index 69722c13d408..9fef7b060d77 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java @@ -397,13 +397,18 @@ private boolean resumeSuspended() if (LOG.isDebugEnabled()) LOG.debug("{} resuming {}", this, entry.request); // Always dispatch to avoid StackOverflowError. - getServer().getThreadPool().execute(entry); + execute(entry.request, entry); return true; } } return false; } + private void execute(Request request, Runnable task) + { + request.getComponents().getExecutor().execute(task); + } + private class Entry implements CyclicTimeouts.Expirable, Runnable { private final Request request; @@ -458,7 +463,7 @@ private void expire() } if (removed) - failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()); + execute(request, () -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException())); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index af8ca07824ed..f3455d380957 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; @@ -63,6 +64,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.NanoTime; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; @@ -231,7 +233,18 @@ public Scheduler getScheduler() @Override public ThreadPool getThreadPool() { - return getServer().getThreadPool(); + Executor executor = getExecutor(); + if (executor instanceof ThreadPool threadPool) + return threadPool; + return new ThreadPoolWrapper(executor); + } + + @Override + public Executor getExecutor() + { + Executor executor = getServer().getThreadPool(); + Executor virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(executor); + return virtualExecutor != null ? virtualExecutor : executor; } @Override @@ -1948,4 +1961,43 @@ private static void failed(Callback callback, Throwable failure) throw t; } } + + private static class ThreadPoolWrapper implements ThreadPool + { + private final Executor _executor; + + private ThreadPoolWrapper(Executor executor) + { + _executor = executor; + } + + @Override + public void execute(Runnable command) + { + _executor.execute(command); + } + + @Override + public void join() + { + } + + @Override + public int getThreads() + { + return 0; + } + + @Override + public int getIdleThreads() + { + return 0; + } + + @Override + public boolean isLowOnThreads() + { + return false; + } + } } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java index dcca3eb0375a..f3b7724420d2 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java @@ -91,7 +91,7 @@ public Runnable onRequest(MetaData.Request request) { Runnable after = _afterHandle.getAndSet(null); if (after != null) - getThreadPool().execute(after); + getExecutor().execute(after); } }; } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java index 88544658be6d..1083dbe6d91a 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java @@ -13,6 +13,9 @@ package org.eclipse.jetty.server.handler; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -28,10 +31,15 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -50,7 +58,8 @@ public class QoSHandlerTest private void start(QoSHandler qosHandler) throws Exception { - server = new Server(); + if (server == null) + server = new Server(); connector = new LocalConnector(server); server.addConnector(connector); server.setHandler(qosHandler); @@ -483,4 +492,70 @@ public boolean handle(Request request, Response response, Callback callback) } }); } + + @Test + @DisabledForJreRange(max = JRE.JAVA_20) + public void testRequestInVirtualThreadIsResumedInVirtualThread() throws Exception + { + QoSHandler qosHandler = new QoSHandler(); + qosHandler.setMaxRequestCount(1); + List