From 68835c3db731ae98c42e46ab50011f15f353eff7 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 24 Sep 2024 14:02:00 +1000 Subject: [PATCH 1/9] Improve handling of idle timeouts when the handling itself is idle Always remember the write failure --- .../eclipse/jetty/server/internal/HttpChannelState.java | 9 +++++---- .../eclipse/jetty/server/internal/HttpConnection.java | 4 ++++ .../main/java/org/eclipse/jetty/util/ExceptionUtil.java | 9 ++++++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 92f9bcf43dfc..012670c0dd85 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1164,14 +1164,15 @@ private boolean lockedIsWriting() private Runnable lockedFailWrite(Throwable x) { assert _request._lock.isHeldByCurrentThread(); - Callback writeCallback = _writeCallback; - _writeCallback = null; - if (writeCallback == null) - return null; + // We always record the failure here, so even if there is no write, subsequent writes will fail. if (_writeFailure == null) _writeFailure = x; else ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x); + Callback writeCallback = _writeCallback; + _writeCallback = null; + if (writeCallback == null) + return null; return () -> HttpChannelState.failed(writeCallback, x); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 37fe59479e59..b9325803dbda 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -608,6 +608,10 @@ public boolean onIdleExpired(TimeoutException timeout) if (_httpChannel.getRequest() == null) return true; Runnable task = _httpChannel.onIdleTimeout(timeout); + + // TODO should we run the task directly here, even though that may block the scheduler? + // This may be preferable to not running an idle task that might free a thread in a fully consumed + // thread pool if (task != null) getExecutor().execute(task); return false; // We've handle the exception diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index 6de5827e71c5..adac901bbc4e 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -27,6 +27,7 @@ */ public class ExceptionUtil { + private static final int MAX_SUPPRESSED = 10; /** *

Convert a {@link Throwable} to a specific type by casting or construction on a new instance.

@@ -199,7 +200,13 @@ public static boolean areNotAssociated(Throwable t1, Throwable t2) public static void addSuppressedIfNotAssociated(Throwable throwable, Throwable suppressed) { if (areNotAssociated(throwable, suppressed)) - throwable.addSuppressed(suppressed); + { + int s = throwable.getSuppressed().length; + if (s < MAX_SUPPRESSED) + throwable.addSuppressed(suppressed); + else if (s == MAX_SUPPRESSED) + throwable.addSuppressed(new IllegalStateException("Too many suppressed", suppressed)); + } } /** From b0b4884f0b1dfd0a949af4d6c200da0dc75ce24a Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Sep 2024 10:25:53 +1000 Subject: [PATCH 2/9] WIP --- .../server/internal/HttpChannelState.java | 15 +++++------ .../jetty/server/internal/HttpConnection.java | 6 ++--- .../org/eclipse/jetty/util/ExceptionUtil.java | 26 +++++++++++++++++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 012670c0dd85..5acb83300094 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -376,7 +376,7 @@ public Runnable onIdleTimeout(TimeoutException t) } // If a write call is pending, take the writeCallback to fail below. - Runnable invokeWriteFailure = _response.lockedFailWrite(t); + Runnable invokeWriteFailure = _response.lockedFailWrite(t, false); // If there was a pending IO operation, deliver the idle timeout via them. if (invokeOnContentAvailable != null || invokeWriteFailure != null) @@ -456,7 +456,7 @@ private Runnable onFailure(Throwable x, boolean remote) _onContentAvailable = null; // If a write call is in progress, take the writeCallback to fail below. - Runnable invokeWriteFailure = _response.lockedFailWrite(x); + Runnable invokeWriteFailure = _response.lockedFailWrite(x, true); // Notify the failure listeners only once. Consumer onFailure = _onFailure; @@ -1161,18 +1161,17 @@ private boolean lockedIsWriting() return _writeCallback != null; } - private Runnable lockedFailWrite(Throwable x) + private Runnable lockedFailWrite(Throwable x, boolean alwaysFailFutureWrites) { assert _request._lock.isHeldByCurrentThread(); - // We always record the failure here, so even if there is no write, subsequent writes will fail. - if (_writeFailure == null) - _writeFailure = x; - else - ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x); + if (alwaysFailFutureWrites) + _writeFailure = ExceptionUtil.combine(_writeFailure, x); Callback writeCallback = _writeCallback; _writeCallback = null; if (writeCallback == null) return null; + if (!alwaysFailFutureWrites) + _writeFailure = ExceptionUtil.combine(_writeFailure, x); return () -> HttpChannelState.failed(writeCallback, x); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index b9325803dbda..b29472116524 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -66,6 +66,7 @@ import org.eclipse.jetty.server.TunnelSupport; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.HostPort; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.StringUtil; @@ -609,11 +610,8 @@ public boolean onIdleExpired(TimeoutException timeout) return true; Runnable task = _httpChannel.onIdleTimeout(timeout); - // TODO should we run the task directly here, even though that may block the scheduler? - // This may be preferable to not running an idle task that might free a thread in a fully consumed - // thread pool if (task != null) - getExecutor().execute(task); + ExceptionUtil.mustExecute(getExecutor(), task); return false; // We've handle the exception } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index adac901bbc4e..a67608950794 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -18,9 +18,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.function.Consumer; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.TryExecutor; /** *

Exception (or rather {@link Throwable} utility methods.

@@ -209,6 +211,27 @@ else if (s == MAX_SUPPRESSED) } } + // TODO javadoc + public static void mustExecute(Executor executor, Runnable task) + { + if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) + return; + + switch (Invocable.getInvocationType(task)) + { + case NON_BLOCKING -> task.run(); + case EITHER -> Invocable.invokeNonBlocking(task); + default -> + { + Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor); + if (virtual != null) + virtual.execute(task); + else + new Thread(task).start(); + } + } + } + /** * Decorate a Throwable with the suppressed errors and return it. * @param t the throwable @@ -305,8 +328,7 @@ public static Throwable combine(Throwable t1, Throwable t2) { if (t1 == null) return t2; - if (areNotAssociated(t1, t2)) - t1.addSuppressed(t2); + addSuppressedIfNotAssociated(t1, t2); return t1; } From 536d46bbd8e2d41dcdff07c34d9f256a58e80bc4 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Sep 2024 13:32:10 +1000 Subject: [PATCH 3/9] WIP --- .../org/eclipse/jetty/server/internal/HttpConnection.java | 5 +---- .../test/java/org/eclipse/jetty/server/HttpChannelTest.java | 5 +++-- .../src/main/java/org/eclipse/jetty/util/ExceptionUtil.java | 3 +++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index b29472116524..ec3ab29874a9 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -608,10 +608,7 @@ public boolean onIdleExpired(TimeoutException timeout) { if (_httpChannel.getRequest() == null) return true; - Runnable task = _httpChannel.onIdleTimeout(timeout); - - if (task != null) - ExceptionUtil.mustExecute(getExecutor(), task); + ExceptionUtil.mustExecute(getExecutor(), _httpChannel.onIdleTimeout(timeout)); return false; // We've handle the exception } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java index 6cb2d79f2c55..a449830c1a7e 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java @@ -1229,10 +1229,11 @@ public boolean handle(Request request, Response response, Callback callback) assertThat(demand.getCount(), is(1L)); Callback.Completable callback = new Callback.Completable(); - // Writes are possible, unless a pending write is failed. + + // Writes are not possible handling.get().write(false, null, callback); assertTrue(callback.isDone()); - assertFalse(callback.isCompletedExceptionally()); + assertTrue(callback.isCompletedExceptionally()); // Run the onFailure task. try (StacklessLogging ignore = new StacklessLogging(Response.class)) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index a67608950794..7152ad05d56b 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -214,6 +214,9 @@ else if (s == MAX_SUPPRESSED) // TODO javadoc public static void mustExecute(Executor executor, Runnable task) { + if (task == null) + return; + if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) return; From abbee3fc61304cbaac72505840958a6e90312af2 Mon Sep 17 00:00:00 2001 From: gregw Date: Mon, 30 Sep 2024 17:17:02 +1000 Subject: [PATCH 4/9] WIP --- .../server/internal/HttpStreamOverFCGI.java | 9 ++-- .../server/internal/ServerFCGIConnection.java | 9 ++-- .../server/internal/HttpChannelState.java | 6 +-- .../jetty/server/internal/HttpConnection.java | 14 +++--- .../org/eclipse/jetty/util/ExceptionUtil.java | 26 ----------- .../eclipse/jetty/util/thread/ThreadPool.java | 45 ++++++++++++++++--- 6 files changed, 53 insertions(+), 56 deletions(-) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java index dad5be63bae2..2952d765e503 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -343,9 +344,7 @@ public void failed(Throwable x) public boolean onIdleTimeout(TimeoutException timeout) { - Runnable task = _httpChannel.onIdleTimeout(timeout); - if (task != null) - execute(task); + ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout)); return false; } @@ -365,9 +364,7 @@ public void succeeded() @Override public void failed(Throwable x) { - Runnable task = _httpChannel.onFailure(x); - if (task != null) - _connection.getConnector().getExecutor().execute(task); + ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x)); } @Override diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index fdfbcf0d6781..34e74d7d228c 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -316,9 +317,7 @@ public boolean onIdleExpired(TimeoutException timeoutException) HttpStreamOverFCGI stream = this.stream; if (stream == null) return true; - Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException); - if (task != null) - getExecutor().execute(task); + ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException)); return false; } @@ -400,9 +399,7 @@ public void onFailure(int request, Throwable failure) LOG.debug("Request {} failure on {}", request, stream, failure); if (stream != null) { - Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure)); - if (runnable != null) - getExecutor().execute(runnable); + ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure))); } stream = null; } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 5acb83300094..ad43a4f00e41 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -456,7 +456,7 @@ private Runnable onFailure(Throwable x, boolean remote) _onContentAvailable = null; // If a write call is in progress, take the writeCallback to fail below. - Runnable invokeWriteFailure = _response.lockedFailWrite(x, true); + Runnable invokeWriteFailure = _response.lockedFailWrite(x, false); // Notify the failure listeners only once. Consumer onFailure = _onFailure; @@ -1008,9 +1008,7 @@ else if (interimCallback == null) @Override public void fail(Throwable failure) { - Runnable runnable = _httpChannelState.onFailure(failure); - if (runnable != null) - getContext().execute(runnable); + ThreadPool.mustExecute(getContext(), _httpChannelState.onFailure(failure)); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index ec3ab29874a9..d5921b632dd5 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -66,13 +66,13 @@ import org.eclipse.jetty.server.TunnelSupport; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.HostPort; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -608,8 +608,8 @@ public boolean onIdleExpired(TimeoutException timeout) { if (_httpChannel.getRequest() == null) return true; - ExceptionUtil.mustExecute(getExecutor(), _httpChannel.onIdleTimeout(timeout)); - return false; // We've handle the exception + ThreadPool.mustExecute(getExecutor(), _httpChannel.onIdleTimeout(timeout)); + return false; } @Override @@ -683,9 +683,7 @@ public void failed(Throwable x) Runnable task = _httpChannel.onFailure(x); if (LOG.isDebugEnabled()) LOG.debug("demand failed {}", task, x); - if (task != null) - // Execute error path as invocation type is probably wrong. - getConnector().getExecutor().execute(task); + ThreadPool.mustExecute(getConnector().getExecutor(), task); } @Override @@ -1039,9 +1037,7 @@ public void badMessage(HttpException failure) _httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY)); } - Runnable task = _httpChannel.onFailure(_failure); - if (task != null) - getServer().getThreadPool().execute(task); + ThreadPool.mustExecute(getServer().getThreadPool(), _httpChannel.onFailure(_failure)); } @Override diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java index 7152ad05d56b..4ebb076ab555 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ExceptionUtil.java @@ -18,11 +18,9 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.function.Consumer; import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.TryExecutor; /** *

Exception (or rather {@link Throwable} utility methods.

@@ -211,30 +209,6 @@ else if (s == MAX_SUPPRESSED) } } - // TODO javadoc - public static void mustExecute(Executor executor, Runnable task) - { - if (task == null) - return; - - if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) - return; - - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING -> task.run(); - case EITHER -> Invocable.invokeNonBlocking(task); - default -> - { - Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor); - if (virtual != null) - virtual.execute(task); - else - new Thread(task).start(); - } - } - } - /** * Decorate a Throwable with the suppressed errors and return it. * @param t the throwable diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index ffe78b5858e5..727bc6b4ca0a 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -15,6 +15,7 @@ import java.util.concurrent.Executor; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.LifeCycle; @@ -32,31 +33,31 @@ public interface ThreadPool extends Executor * * @throws InterruptedException if thread was interrupted */ - public void join() throws InterruptedException; + void join() throws InterruptedException; /** * @return The total number of threads currently in the pool */ @ManagedAttribute("number of threads in pool") - public int getThreads(); + int getThreads(); /** * @return The number of idle threads in the pool */ @ManagedAttribute("number of idle threads in pool") - public int getIdleThreads(); + int getIdleThreads(); /** * @return True if the pool is low on threads */ @ManagedAttribute("indicates the pool is low on available threads") - public boolean isLowOnThreads(); + boolean isLowOnThreads(); /** *

Specialized sub-interface of ThreadPool that allows to get/set * the minimum and maximum number of threads of the pool.

*/ - public interface SizedThreadPool extends ThreadPool + interface SizedThreadPool extends ThreadPool { /** * @return the minimum number of threads @@ -87,4 +88,38 @@ default ThreadPoolBudget getThreadPoolBudget() return null; } } + + /** + *

Execute a task immediately without queueing. This may use a + * {@code ReservedThread}, a {@code Virtual Thread}, a call to {@link Invocable#invokeNonBlocking(Runnable)}, + * a newly spawned thread, or direct execution. + * + * @param executor An executor that may be used + * @param task The task that must be executed. + */ + static void mustExecute(Executor executor, Runnable task) + { + if (task == null) + return; + + if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) + return; + + Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor); + if (virtual != null) + virtual.execute(task); + + switch (Invocable.getInvocationType(task)) + { + case NON_BLOCKING -> task.run(); + case EITHER -> Invocable.invokeNonBlocking(task); + default -> + { + if (Runtime.getRuntime().freeMemory() > (10 * 1024)) + new Thread(task).start(); + else + task.run(); + } + } + } } From 9a6aff8e5afa4fff8ad8f9371b5fdcc2b65f00ec Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 1 Oct 2024 07:03:30 +1000 Subject: [PATCH 5/9] WIP --- .../test/java/org/eclipse/jetty/server/HttpChannelTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java index a449830c1a7e..afa0489bed5e 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java @@ -1230,10 +1230,10 @@ public boolean handle(Request request, Response response, Callback callback) Callback.Completable callback = new Callback.Completable(); - // Writes are not possible + // Writes are possible, unless a pending write is failed. handling.get().write(false, null, callback); assertTrue(callback.isDone()); - assertTrue(callback.isCompletedExceptionally()); + assertFalse(callback.isCompletedExceptionally()); // Run the onFailure task. try (StacklessLogging ignore = new StacklessLogging(Response.class)) From 171614d2ef372360dd943b344a78d94bb89cd8ed Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 3 Oct 2024 06:43:30 +1000 Subject: [PATCH 6/9] WIP --- .../main/java/org/eclipse/jetty/util/thread/ThreadPool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 727bc6b4ca0a..1a53ddda7d43 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -115,7 +115,8 @@ static void mustExecute(Executor executor, Runnable task) case EITHER -> Invocable.invokeNonBlocking(task); default -> { - if (Runtime.getRuntime().freeMemory() > (10 * 1024)) + Runtime runtime = Runtime.getRuntime(); + if (runtime.maxMemory() - runtime.totalMemory() > (100 * 1024)) new Thread(task).start(); else task.run(); From aaa3dc855ffc19b133af4c8059a9465be8a9d12c Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 3 Oct 2024 06:49:27 +1000 Subject: [PATCH 7/9] WIP --- .../java/org/eclipse/jetty/util/thread/ThreadPool.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 1a53ddda7d43..6b264b9d9996 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -115,11 +115,14 @@ static void mustExecute(Executor executor, Runnable task) case EITHER -> Invocable.invokeNonBlocking(task); default -> { - Runtime runtime = Runtime.getRuntime(); - if (runtime.maxMemory() - runtime.totalMemory() > (100 * 1024)) + try + { new Thread(task).start(); - else + } + catch (OutOfMemoryError ignored) + { task.run(); + } } } } From e88bc1e8575df342282b66b8933cdcb0cb1b9048 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 8 Oct 2024 17:59:21 +1100 Subject: [PATCH 8/9] WIP updates from review --- .../jetty/fcgi/server/internal/HttpStreamOverFCGI.java | 4 ++-- .../jetty/fcgi/server/internal/ServerFCGIConnection.java | 4 ++-- .../org/eclipse/jetty/server/internal/HttpChannelState.java | 2 +- .../org/eclipse/jetty/server/internal/HttpConnection.java | 6 +++--- .../main/java/org/eclipse/jetty/util/thread/ThreadPool.java | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java index 2952d765e503..1e85cadfc01c 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java @@ -344,7 +344,7 @@ public void failed(Throwable x) public boolean onIdleTimeout(TimeoutException timeout) { - ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout)); + ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout)); return false; } @@ -364,7 +364,7 @@ public void succeeded() @Override public void failed(Throwable x) { - ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x)); + ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x)); } @Override diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index 34e74d7d228c..3c606fb817af 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -317,7 +317,7 @@ public boolean onIdleExpired(TimeoutException timeoutException) HttpStreamOverFCGI stream = this.stream; if (stream == null) return true; - ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException)); + ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException)); return false; } @@ -399,7 +399,7 @@ public void onFailure(int request, Throwable failure) LOG.debug("Request {} failure on {}", request, stream, failure); if (stream != null) { - ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure))); + ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure))); } stream = null; } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index ad43a4f00e41..a739a37025b8 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1008,7 +1008,7 @@ else if (interimCallback == null) @Override public void fail(Throwable failure) { - ThreadPool.mustExecute(getContext(), _httpChannelState.onFailure(failure)); + ThreadPool.executeImmediately(getContext(), _httpChannelState.onFailure(failure)); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index d5921b632dd5..d41ca03a56f4 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -608,7 +608,7 @@ public boolean onIdleExpired(TimeoutException timeout) { if (_httpChannel.getRequest() == null) return true; - ThreadPool.mustExecute(getExecutor(), _httpChannel.onIdleTimeout(timeout)); + ThreadPool.executeImmediately(getExecutor(), _httpChannel.onIdleTimeout(timeout)); return false; } @@ -683,7 +683,7 @@ public void failed(Throwable x) Runnable task = _httpChannel.onFailure(x); if (LOG.isDebugEnabled()) LOG.debug("demand failed {}", task, x); - ThreadPool.mustExecute(getConnector().getExecutor(), task); + ThreadPool.executeImmediately(getConnector().getExecutor(), task); } @Override @@ -1037,7 +1037,7 @@ public void badMessage(HttpException failure) _httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY)); } - ThreadPool.mustExecute(getServer().getThreadPool(), _httpChannel.onFailure(_failure)); + ThreadPool.executeImmediately(getServer().getThreadPool(), _httpChannel.onFailure(_failure)); } @Override diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 6b264b9d9996..e4338378f123 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -97,7 +97,7 @@ default ThreadPoolBudget getThreadPoolBudget() * @param executor An executor that may be used * @param task The task that must be executed. */ - static void mustExecute(Executor executor, Runnable task) + static void executeImmediately(Executor executor, Runnable task) { if (task == null) return; From 5126995afc65cc61061ec527871ebc46e17d39ea Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 8 Oct 2024 21:45:44 +1100 Subject: [PATCH 9/9] Updates from review --- .../jetty/server/internal/HttpChannelState.java | 11 ++++------- .../org/eclipse/jetty/util/thread/ThreadPool.java | 5 ++++- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index a739a37025b8..6450ec5815df 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -376,7 +376,7 @@ public Runnable onIdleTimeout(TimeoutException t) } // If a write call is pending, take the writeCallback to fail below. - Runnable invokeWriteFailure = _response.lockedFailWrite(t, false); + Runnable invokeWriteFailure = _response.lockedFailWrite(t); // If there was a pending IO operation, deliver the idle timeout via them. if (invokeOnContentAvailable != null || invokeWriteFailure != null) @@ -456,7 +456,7 @@ private Runnable onFailure(Throwable x, boolean remote) _onContentAvailable = null; // If a write call is in progress, take the writeCallback to fail below. - Runnable invokeWriteFailure = _response.lockedFailWrite(x, false); + Runnable invokeWriteFailure = _response.lockedFailWrite(x); // Notify the failure listeners only once. Consumer onFailure = _onFailure; @@ -1159,17 +1159,14 @@ private boolean lockedIsWriting() return _writeCallback != null; } - private Runnable lockedFailWrite(Throwable x, boolean alwaysFailFutureWrites) + private Runnable lockedFailWrite(Throwable x) { assert _request._lock.isHeldByCurrentThread(); - if (alwaysFailFutureWrites) - _writeFailure = ExceptionUtil.combine(_writeFailure, x); Callback writeCallback = _writeCallback; _writeCallback = null; if (writeCallback == null) return null; - if (!alwaysFailFutureWrites) - _writeFailure = ExceptionUtil.combine(_writeFailure, x); + _writeFailure = ExceptionUtil.combine(_writeFailure, x); return () -> HttpChannelState.failed(writeCallback, x); } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index e4338378f123..dcae232c908a 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -107,7 +107,10 @@ static void executeImmediately(Executor executor, Runnable task) Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor); if (virtual != null) + { virtual.execute(task); + return; + } switch (Invocable.getInvocationType(task)) { @@ -119,7 +122,7 @@ static void executeImmediately(Executor executor, Runnable task) { new Thread(task).start(); } - catch (OutOfMemoryError ignored) + catch (Throwable ignored) { task.run(); }