Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of idle timeouts when the handling itself is idle #12301

Merged
merged 10 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -343,9 +344,7 @@ public void failed(Throwable x)

public boolean onIdleTimeout(TimeoutException timeout)
{
Runnable task = _httpChannel.onIdleTimeout(timeout);
if (task != null)
execute(task);
ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

Expand All @@ -365,9 +364,7 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
Runnable task = _httpChannel.onFailure(x);
if (task != null)
_connection.getConnector().getExecutor().execute(task);
ThreadPool.executeImmediately(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -316,9 +317,7 @@ public boolean onIdleExpired(TimeoutException timeoutException)
HttpStreamOverFCGI stream = this.stream;
if (stream == null)
return true;
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
if (task != null)
getExecutor().execute(task);
ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException));
return false;
}

Expand Down Expand Up @@ -400,9 +399,7 @@ public void onFailure(int request, Throwable failure)
LOG.debug("Request {} failure on {}", request, stream, failure);
if (stream != null)
{
Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure));
if (runnable != null)
getExecutor().execute(runnable);
ThreadPool.executeImmediately(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure)));
}
stream = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public Runnable onIdleTimeout(TimeoutException t)
}

// If a write call is pending, take the writeCallback to fail below.
Runnable invokeWriteFailure = _response.lockedFailWrite(t);
Runnable invokeWriteFailure = _response.lockedFailWrite(t, false);

// If there was a pending IO operation, deliver the idle timeout via them.
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
Expand Down Expand Up @@ -456,7 +456,7 @@ private Runnable onFailure(Throwable x, boolean remote)
_onContentAvailable = null;

// If a write call is in progress, take the writeCallback to fail below.
Runnable invokeWriteFailure = _response.lockedFailWrite(x);
Runnable invokeWriteFailure = _response.lockedFailWrite(x, false);

// Notify the failure listeners only once.
Consumer<Throwable> onFailure = _onFailure;
Expand Down Expand Up @@ -1008,9 +1008,7 @@ else if (interimCallback == null)
@Override
public void fail(Throwable failure)
{
Runnable runnable = _httpChannelState.onFailure(failure);
if (runnable != null)
getContext().execute(runnable);
ThreadPool.executeImmediately(getContext(), _httpChannelState.onFailure(failure));
}

@Override
Expand Down Expand Up @@ -1161,17 +1159,17 @@ private boolean lockedIsWriting()
return _writeCallback != null;
}

private Runnable lockedFailWrite(Throwable x)
private Runnable lockedFailWrite(Throwable x, boolean alwaysFailFutureWrites)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new parameter is always false, so remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced that we should never do this. But I will remove for now.

{
assert _request._lock.isHeldByCurrentThread();
if (alwaysFailFutureWrites)
_writeFailure = ExceptionUtil.combine(_writeFailure, x);
Callback writeCallback = _writeCallback;
gregw marked this conversation as resolved.
Show resolved Hide resolved
_writeCallback = null;
if (writeCallback == null)
return null;
if (_writeFailure == null)
_writeFailure = x;
else
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
if (!alwaysFailFutureWrites)
_writeFailure = ExceptionUtil.combine(_writeFailure, x);
return () -> HttpChannelState.failed(writeCallback, x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -607,10 +608,8 @@ public boolean onIdleExpired(TimeoutException timeout)
{
if (_httpChannel.getRequest() == null)
return true;
Runnable task = _httpChannel.onIdleTimeout(timeout);
if (task != null)
getExecutor().execute(task);
return false; // We've handle the exception
ThreadPool.executeImmediately(getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

@Override
Expand Down Expand Up @@ -684,9 +683,7 @@ public void failed(Throwable x)
Runnable task = _httpChannel.onFailure(x);
if (LOG.isDebugEnabled())
LOG.debug("demand failed {}", task, x);
if (task != null)
// Execute error path as invocation type is probably wrong.
getConnector().getExecutor().execute(task);
ThreadPool.executeImmediately(getConnector().getExecutor(), task);
}

@Override
Expand Down Expand Up @@ -1040,9 +1037,7 @@ public void badMessage(HttpException failure)
_httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY));
}

Runnable task = _httpChannel.onFailure(_failure);
if (task != null)
getServer().getThreadPool().execute(task);
ThreadPool.executeImmediately(getServer().getThreadPool(), _httpChannel.onFailure(_failure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ public boolean handle(Request request, Response response, Callback callback)
assertThat(demand.getCount(), is(1L));

Callback.Completable callback = new Callback.Completable();

// Writes are possible, unless a pending write is failed.
handling.get().write(false, null, callback);
assertTrue(callback.isDone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/
public class ExceptionUtil
{
private static final int MAX_SUPPRESSED = 10;

/**
* <p>Convert a {@link Throwable} to a specific type by casting or construction on a new instance.</p>
Expand Down Expand Up @@ -199,7 +200,13 @@ public static boolean areNotAssociated(Throwable t1, Throwable t2)
public static void addSuppressedIfNotAssociated(Throwable throwable, Throwable suppressed)
{
if (areNotAssociated(throwable, suppressed))
throwable.addSuppressed(suppressed);
{
int s = throwable.getSuppressed().length;
if (s < MAX_SUPPRESSED)
throwable.addSuppressed(suppressed);
else if (s == MAX_SUPPRESSED)
throwable.addSuppressed(new IllegalStateException("Too many suppressed", suppressed));
}
}

/**
Expand Down Expand Up @@ -298,8 +305,7 @@ public static Throwable combine(Throwable t1, Throwable t2)
{
if (t1 == null)
return t2;
if (areNotAssociated(t1, t2))
t1.addSuppressed(t2);
addSuppressedIfNotAssociated(t1, t2);
return t1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.Executor;

import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -32,31 +33,31 @@ public interface ThreadPool extends Executor
*
* @throws InterruptedException if thread was interrupted
*/
public void join() throws InterruptedException;
void join() throws InterruptedException;

/**
* @return The total number of threads currently in the pool
*/
@ManagedAttribute("number of threads in pool")
public int getThreads();
int getThreads();

/**
* @return The number of idle threads in the pool
*/
@ManagedAttribute("number of idle threads in pool")
public int getIdleThreads();
int getIdleThreads();

/**
* @return True if the pool is low on threads
*/
@ManagedAttribute("indicates the pool is low on available threads")
public boolean isLowOnThreads();
boolean isLowOnThreads();

/**
* <p>Specialized sub-interface of ThreadPool that allows to get/set
* the minimum and maximum number of threads of the pool.</p>
*/
public interface SizedThreadPool extends ThreadPool
interface SizedThreadPool extends ThreadPool
{
/**
* @return the minimum number of threads
Expand Down Expand Up @@ -87,4 +88,42 @@ default ThreadPoolBudget getThreadPoolBudget()
return null;
}
}

/**
* <p>Execute a task immediately without queueing. This may use a
* {@code ReservedThread}, a {@code Virtual Thread}, a call to {@link Invocable#invokeNonBlocking(Runnable)},
* a newly spawned thread, or direct execution.
*
* @param executor An executor that may be used
* @param task The task that must be executed.
*/
static void executeImmediately(Executor executor, Runnable task)
{
if (task == null)
return;

if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task))
return;

Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor);
if (virtual != null)
virtual.execute(task);
gregw marked this conversation as resolved.
Show resolved Hide resolved

switch (Invocable.getInvocationType(task))
{
case NON_BLOCKING -> task.run();
case EITHER -> Invocable.invokeNonBlocking(task);
default ->
{
try
{
new Thread(task).start();
}
catch (OutOfMemoryError ignored)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
task.run();
}
}
}
}
}
Loading