Skip to content

Commit

Permalink
Improve handling of idle timeouts when the handling itself is idle (#…
Browse files Browse the repository at this point in the history
…12301)

Always execute (without queuing) the task to call error listeners
  • Loading branch information
gregw authored and olamy committed Oct 30, 2024
1 parent d6eb403 commit 3ff79f5
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -343,9 +344,7 @@ public void failed(Throwable x)

public boolean onIdleTimeout(TimeoutException timeout)
{
Runnable task = _httpChannel.onIdleTimeout(timeout);
if (task != null)
execute(task);
ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

Expand All @@ -365,9 +364,7 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
Runnable task = _httpChannel.onFailure(x);
if (task != null)
_connection.getConnector().getExecutor().execute(task);
ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -315,9 +316,7 @@ public boolean onIdleExpired(TimeoutException timeoutException)
HttpStreamOverFCGI stream = this.stream;
if (stream == null)
return true;
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
if (task != null)
getExecutor().execute(task);
ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException));
return false;
}

Expand Down Expand Up @@ -399,9 +398,7 @@ public void onFailure(int request, Throwable failure)
LOG.debug("Request {} failure on {}", request, stream, failure);
if (stream != null)
{
Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure));
if (runnable != null)
getExecutor().execute(runnable);
ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure)));
}
stream = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,7 @@ else if (interimCallback == null)
@Override
public void fail(Throwable failure)
{
Runnable runnable = _httpChannelState.onFailure(failure);
if (runnable != null)
getContext().execute(runnable);
ThreadPool.executeImmediately(getContext(), _httpChannelState.onFailure(failure));
}

@Override
Expand Down Expand Up @@ -1168,10 +1166,7 @@ private Runnable lockedFailWrite(Throwable x)
_writeCallback = null;
if (writeCallback == null)
return null;
if (_writeFailure == null)
_writeFailure = x;
else
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
_writeFailure = ExceptionUtil.combine(_writeFailure, x);
return () -> HttpChannelState.failed(writeCallback, x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -617,10 +618,8 @@ public boolean onIdleExpired(TimeoutException timeout)
{
if (_httpChannel.getRequest() == null)
return true;
Runnable task = _httpChannel.onIdleTimeout(timeout);
if (task != null)
getExecutor().execute(task);
return false; // We've handle the exception
ThreadPool.executeImmediately(getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

@Override
Expand Down Expand Up @@ -694,9 +693,7 @@ public void failed(Throwable x)
Runnable task = _httpChannel.onFailure(x);
if (LOG.isDebugEnabled())
LOG.debug("demand failed {}", task, x);
if (task != null)
// Execute error path as invocation type is probably wrong.
getConnector().getExecutor().execute(task);
ThreadPool.executeImmediately(getConnector().getExecutor(), task);
}

@Override
Expand Down Expand Up @@ -1054,9 +1051,7 @@ public void badMessage(HttpException failure)
if (_httpChannel.getRequest() == null)
_httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, stream._uri, stream._version, HttpFields.EMPTY));

Runnable task = _httpChannel.onFailure(_failure);
if (task != null)
getServer().getThreadPool().execute(task);
ThreadPool.executeImmediately(getServer().getThreadPool(), _httpChannel.onFailure(_failure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ public boolean handle(Request request, Response response, Callback callback)
assertThat(demand.getCount(), is(1L));

Callback.Completable callback = new Callback.Completable();

// Writes are possible, unless a pending write is failed.
handling.get().write(false, null, callback);
assertTrue(callback.isDone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/
public class ExceptionUtil
{
private static final int MAX_SUPPRESSED = 10;

/**
* <p>Convert a {@link Throwable} to a specific type by casting or construction on a new instance.</p>
Expand Down Expand Up @@ -199,7 +200,13 @@ public static boolean areNotAssociated(Throwable t1, Throwable t2)
public static void addSuppressedIfNotAssociated(Throwable throwable, Throwable suppressed)
{
if (areNotAssociated(throwable, suppressed))
throwable.addSuppressed(suppressed);
{
int s = throwable.getSuppressed().length;
if (s < MAX_SUPPRESSED)
throwable.addSuppressed(suppressed);
else if (s == MAX_SUPPRESSED)
throwable.addSuppressed(new IllegalStateException("Too many suppressed", suppressed));
}
}

/**
Expand Down Expand Up @@ -298,8 +305,7 @@ public static Throwable combine(Throwable t1, Throwable t2)
{
if (t1 == null)
return t2;
if (areNotAssociated(t1, t2))
t1.addSuppressed(t2);
addSuppressedIfNotAssociated(t1, t2);
return t1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.Executor;

import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -32,31 +33,31 @@ public interface ThreadPool extends Executor
*
* @throws InterruptedException if thread was interrupted
*/
public void join() throws InterruptedException;
void join() throws InterruptedException;

/**
* @return The total number of threads currently in the pool
*/
@ManagedAttribute("number of threads in pool")
public int getThreads();
int getThreads();

/**
* @return The number of idle threads in the pool
*/
@ManagedAttribute("number of idle threads in pool")
public int getIdleThreads();
int getIdleThreads();

/**
* @return True if the pool is low on threads
*/
@ManagedAttribute("indicates the pool is low on available threads")
public boolean isLowOnThreads();
boolean isLowOnThreads();

/**
* <p>Specialized sub-interface of ThreadPool that allows to get/set
* the minimum and maximum number of threads of the pool.</p>
*/
public interface SizedThreadPool extends ThreadPool
interface SizedThreadPool extends ThreadPool
{
/**
* @return the minimum number of threads
Expand Down Expand Up @@ -87,4 +88,45 @@ default ThreadPoolBudget getThreadPoolBudget()
return null;
}
}

/**
* <p>Execute a task immediately without queueing. This may use a
* {@code ReservedThread}, a {@code Virtual Thread}, a call to {@link Invocable#invokeNonBlocking(Runnable)},
* a newly spawned thread, or direct execution.
*
* @param executor An executor that may be used
* @param task The task that must be executed.
*/
static void executeImmediately(Executor executor, Runnable task)
{
if (task == null)
return;

if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task))
return;

Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor);
if (virtual != null)
{
virtual.execute(task);
return;
}

switch (Invocable.getInvocationType(task))
{
case NON_BLOCKING -> task.run();
case EITHER -> Invocable.invokeNonBlocking(task);
default ->
{
try
{
new Thread(task).start();
}
catch (Throwable ignored)
{
task.run();
}
}
}
}
}

0 comments on commit 3ff79f5

Please sign in to comment.