diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java index dad5be63bae2..1e85cadfc01c 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -343,9 +344,7 @@ public void failed(Throwable x) public boolean onIdleTimeout(TimeoutException timeout) { - Runnable task = _httpChannel.onIdleTimeout(timeout); - if (task != null) - execute(task); + ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout)); return false; } @@ -365,9 +364,7 @@ public void succeeded() @Override public void failed(Throwable x) { - Runnable task = _httpChannel.onFailure(x); - if (task != null) - _connection.getConnector().getExecutor().execute(task); + ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x)); } @Override diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index fdfbcf0d6781..3c606fb817af 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -316,9 +317,7 @@ public boolean onIdleExpired(TimeoutException timeoutException) HttpStreamOverFCGI stream = this.stream; if (stream == null) return true; - Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException); - if (task != null) - getExecutor().execute(task); + ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException)); return false; } @@ -400,9 +399,7 @@ public void onFailure(int request, Throwable failure) LOG.debug("Request {} failure on {}", request, stream, failure); if (stream != null) { - Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure)); - if (runnable != null) - getExecutor().execute(runnable); + ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure))); } stream = null; } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 92f9bcf43dfc..6450ec5815df 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1008,9 +1008,7 @@ else if (interimCallback == null) @Override public void fail(Throwable failure) { - Runnable runnable = _httpChannelState.onFailure(failure); - if (runnable != null) - getContext().execute(runnable); + ThreadPool.executeImmediately(getContext(), _httpChannelState.onFailure(failure)); } @Override @@ -1168,10 +1166,7 @@ private Runnable lockedFailWrite(Throwable x) _writeCallback = null; if (writeCallback == null) return null; - if (_writeFailure == null) - _writeFailure = x; - else - ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x); + _writeFailure = ExceptionUtil.combine(_writeFailure, x); return () -> HttpChannelState.failed(writeCallback, x); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 37fe59479e59..d41ca03a56f4 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -72,6 +72,7 @@ import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -607,10 +608,8 @@ public boolean onIdleExpired(TimeoutException timeout) { if (_httpChannel.getRequest() == null) return true; - Runnable task = _httpChannel.onIdleTimeout(timeout); - if (task != null) - getExecutor().execute(task); - return false; // We've handle the exception + ThreadPool.executeImmediately(getExecutor(), _httpChannel.onIdleTimeout(timeout)); + return false; } @Override @@ -684,9 +683,7 @@ public void failed(Throwable x) Runnable task = _httpChannel.onFailure(x); if (LOG.isDebugEnabled()) LOG.debug("demand failed {}", task, x); - if (task != null) - // Execute error path as invocation type is probably wrong. - getConnector().getExecutor().execute(task); + ThreadPool.executeImmediately(getConnector().getExecutor(), task); } @Override @@ -1040,9 +1037,7 @@ public void badMessage(HttpException failure) _httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY)); } - Runnable task = _httpChannel.onFailure(_failure); - if (task != null) - getServer().getThreadPool().execute(task); + ThreadPool.executeImmediately(getServer().getThreadPool(), _httpChannel.onFailure(_failure)); } @Override diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java index 6cb2d79f2c55..afa0489bed5e 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java @@ -1229,6 +1229,7 @@ public boolean handle(Request request, Response response, Callback callback) assertThat(demand.getCount(), is(1L)); Callback.Completable callback = new Callback.Completable(); + // Writes are possible, unless a pending write is failed. handling.get().write(false, null, callback); assertTrue(callback.isDone()); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index 6de5827e71c5..4ebb076ab555 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -27,6 +27,7 @@ */ public class ExceptionUtil { + private static final int MAX_SUPPRESSED = 10; /** *
Convert a {@link Throwable} to a specific type by casting or construction on a new instance.
@@ -199,7 +200,13 @@ public static boolean areNotAssociated(Throwable t1, Throwable t2) public static void addSuppressedIfNotAssociated(Throwable throwable, Throwable suppressed) { if (areNotAssociated(throwable, suppressed)) - throwable.addSuppressed(suppressed); + { + int s = throwable.getSuppressed().length; + if (s < MAX_SUPPRESSED) + throwable.addSuppressed(suppressed); + else if (s == MAX_SUPPRESSED) + throwable.addSuppressed(new IllegalStateException("Too many suppressed", suppressed)); + } } /** @@ -298,8 +305,7 @@ public static Throwable combine(Throwable t1, Throwable t2) { if (t1 == null) return t2; - if (areNotAssociated(t1, t2)) - t1.addSuppressed(t2); + addSuppressedIfNotAssociated(t1, t2); return t1; } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index ffe78b5858e5..dcae232c908a 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -15,6 +15,7 @@ import java.util.concurrent.Executor; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.LifeCycle; @@ -32,31 +33,31 @@ public interface ThreadPool extends Executor * * @throws InterruptedException if thread was interrupted */ - public void join() throws InterruptedException; + void join() throws InterruptedException; /** * @return The total number of threads currently in the pool */ @ManagedAttribute("number of threads in pool") - public int getThreads(); + int getThreads(); /** * @return The number of idle threads in the pool */ @ManagedAttribute("number of idle threads in pool") - public int getIdleThreads(); + int getIdleThreads(); /** * @return True if the pool is low on threads */ @ManagedAttribute("indicates the pool is low on available threads") - public boolean isLowOnThreads(); + boolean isLowOnThreads(); /** *Specialized sub-interface of ThreadPool that allows to get/set * the minimum and maximum number of threads of the pool.
*/ - public interface SizedThreadPool extends ThreadPool + interface SizedThreadPool extends ThreadPool { /** * @return the minimum number of threads @@ -87,4 +88,45 @@ default ThreadPoolBudget getThreadPoolBudget() return null; } } + + /** + *Execute a task immediately without queueing. This may use a + * {@code ReservedThread}, a {@code Virtual Thread}, a call to {@link Invocable#invokeNonBlocking(Runnable)}, + * a newly spawned thread, or direct execution. + * + * @param executor An executor that may be used + * @param task The task that must be executed. + */ + static void executeImmediately(Executor executor, Runnable task) + { + if (task == null) + return; + + if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) + return; + + Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor); + if (virtual != null) + { + virtual.execute(task); + return; + } + + switch (Invocable.getInvocationType(task)) + { + case NON_BLOCKING -> task.run(); + case EITHER -> Invocable.invokeNonBlocking(task); + default -> + { + try + { + new Thread(task).start(); + } + catch (Throwable ignored) + { + task.run(); + } + } + } + } }