Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of idle timeouts when the handling itself is idle #12301

Merged
merged 10 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -343,9 +344,7 @@ public void failed(Throwable x)

public boolean onIdleTimeout(TimeoutException timeout)
{
Runnable task = _httpChannel.onIdleTimeout(timeout);
if (task != null)
execute(task);
ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

Expand All @@ -365,9 +364,7 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
Runnable task = _httpChannel.onFailure(x);
if (task != null)
_connection.getConnector().getExecutor().execute(task);
ThreadPool.mustExecute(_connection.getConnector().getExecutor(), _httpChannel.onFailure(x));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -316,9 +317,7 @@ public boolean onIdleExpired(TimeoutException timeoutException)
HttpStreamOverFCGI stream = this.stream;
if (stream == null)
return true;
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
if (task != null)
getExecutor().execute(task);
ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onIdleTimeout(timeoutException));
return false;
}

Expand Down Expand Up @@ -400,9 +399,7 @@ public void onFailure(int request, Throwable failure)
LOG.debug("Request {} failure on {}", request, stream, failure);
if (stream != null)
{
Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure));
if (runnable != null)
getExecutor().execute(runnable);
ThreadPool.mustExecute(getExecutor(), stream.getHttpChannel().onFailure(new BadMessageException(null, failure)));
}
stream = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public Runnable onIdleTimeout(TimeoutException t)
}

// If a write call is pending, take the writeCallback to fail below.
Runnable invokeWriteFailure = _response.lockedFailWrite(t);
Runnable invokeWriteFailure = _response.lockedFailWrite(t, false);

// If there was a pending IO operation, deliver the idle timeout via them.
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
Expand Down Expand Up @@ -456,7 +456,7 @@ private Runnable onFailure(Throwable x, boolean remote)
_onContentAvailable = null;

// If a write call is in progress, take the writeCallback to fail below.
Runnable invokeWriteFailure = _response.lockedFailWrite(x);
Runnable invokeWriteFailure = _response.lockedFailWrite(x, false);

// Notify the failure listeners only once.
Consumer<Throwable> onFailure = _onFailure;
Expand Down Expand Up @@ -1008,9 +1008,7 @@ else if (interimCallback == null)
@Override
public void fail(Throwable failure)
{
Runnable runnable = _httpChannelState.onFailure(failure);
if (runnable != null)
getContext().execute(runnable);
ThreadPool.mustExecute(getContext(), _httpChannelState.onFailure(failure));
}

@Override
Expand Down Expand Up @@ -1161,18 +1159,17 @@ private boolean lockedIsWriting()
return _writeCallback != null;
}

private Runnable lockedFailWrite(Throwable x)
private Runnable lockedFailWrite(Throwable x, boolean alwaysFailFutureWrites)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new parameter is always false, so remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced that we should never do this. But I will remove for now.

{
assert _request._lock.isHeldByCurrentThread();
// We always record the failure here, so even if there is no write, subsequent writes will fail.
if (_writeFailure == null)
_writeFailure = x;
else
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
if (alwaysFailFutureWrites)
_writeFailure = ExceptionUtil.combine(_writeFailure, x);
Callback writeCallback = _writeCallback;
gregw marked this conversation as resolved.
Show resolved Hide resolved
_writeCallback = null;
if (writeCallback == null)
return null;
if (!alwaysFailFutureWrites)
_writeFailure = ExceptionUtil.combine(_writeFailure, x);
return () -> HttpChannelState.failed(writeCallback, x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -607,14 +608,8 @@ public boolean onIdleExpired(TimeoutException timeout)
{
if (_httpChannel.getRequest() == null)
return true;
Runnable task = _httpChannel.onIdleTimeout(timeout);

// TODO should we run the task directly here, even though that may block the scheduler?
// This may be preferable to not running an idle task that might free a thread in a fully consumed
// thread pool
if (task != null)
getExecutor().execute(task);
return false; // We've handle the exception
ThreadPool.mustExecute(getExecutor(), _httpChannel.onIdleTimeout(timeout));
return false;
}

@Override
Expand Down Expand Up @@ -688,9 +683,7 @@ public void failed(Throwable x)
Runnable task = _httpChannel.onFailure(x);
if (LOG.isDebugEnabled())
LOG.debug("demand failed {}", task, x);
if (task != null)
// Execute error path as invocation type is probably wrong.
getConnector().getExecutor().execute(task);
ThreadPool.mustExecute(getConnector().getExecutor(), task);
}

@Override
Expand Down Expand Up @@ -1044,9 +1037,7 @@ public void badMessage(HttpException failure)
_httpChannel.onRequest(new MetaData.Request(_parser.getBeginNanoTime(), stream._method, uri, stream._version, HttpFields.EMPTY));
}

Runnable task = _httpChannel.onFailure(_failure);
if (task != null)
getServer().getThreadPool().execute(task);
ThreadPool.mustExecute(getServer().getThreadPool(), _httpChannel.onFailure(_failure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ public boolean handle(Request request, Response response, Callback callback)
assertThat(demand.getCount(), is(1L));

Callback.Completable callback = new Callback.Completable();

// Writes are possible, unless a pending write is failed.
handling.get().write(false, null, callback);
assertTrue(callback.isDone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ public static Throwable combine(Throwable t1, Throwable t2)
{
if (t1 == null)
return t2;
if (areNotAssociated(t1, t2))
t1.addSuppressed(t2);
addSuppressedIfNotAssociated(t1, t2);
return t1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.Executor;

import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -32,31 +33,31 @@ public interface ThreadPool extends Executor
*
* @throws InterruptedException if thread was interrupted
*/
public void join() throws InterruptedException;
void join() throws InterruptedException;

/**
* @return The total number of threads currently in the pool
*/
@ManagedAttribute("number of threads in pool")
public int getThreads();
int getThreads();

/**
* @return The number of idle threads in the pool
*/
@ManagedAttribute("number of idle threads in pool")
public int getIdleThreads();
int getIdleThreads();

/**
* @return True if the pool is low on threads
*/
@ManagedAttribute("indicates the pool is low on available threads")
public boolean isLowOnThreads();
boolean isLowOnThreads();

/**
* <p>Specialized sub-interface of ThreadPool that allows to get/set
* the minimum and maximum number of threads of the pool.</p>
*/
public interface SizedThreadPool extends ThreadPool
interface SizedThreadPool extends ThreadPool
{
/**
* @return the minimum number of threads
Expand Down Expand Up @@ -87,4 +88,38 @@ default ThreadPoolBudget getThreadPoolBudget()
return null;
}
}

/**
* <p>Execute a task immediately without queueing. This may use a
* {@code ReservedThread}, a {@code Virtual Thread}, a call to {@link Invocable#invokeNonBlocking(Runnable)},
* a newly spawned thread, or direct execution.
*
* @param executor An executor that may be used
* @param task The task that must be executed.
*/
static void mustExecute(Executor executor, Runnable task)
lorban marked this conversation as resolved.
Show resolved Hide resolved
{
if (task == null)
return;

if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task))
return;

Executor virtual = VirtualThreads.getVirtualThreadsExecutor(executor);
if (virtual != null)
virtual.execute(task);
gregw marked this conversation as resolved.
Show resolved Hide resolved

switch (Invocable.getInvocationType(task))
{
case NON_BLOCKING -> task.run();
case EITHER -> Invocable.invokeNonBlocking(task);
default ->
{
if (Runtime.getRuntime().freeMemory() > (10 * 1024))
lorban marked this conversation as resolved.
Show resolved Hide resolved
new Thread(task).start();
else
task.run();
}
}
}
}
Loading