Skip to content

Commit

Permalink
Fixes HttpClient Content.Source reads from arbitrary threads (#12203)
Browse files Browse the repository at this point in the history
* Reworked HttpReceiverOverHTTP state machine, in particular:
** Introduced a boolean parameter to parseAndFill() and parse(), that specifies whether to notify the application demand callback.
   This is necessary because reads may happen from any threads, and must not notify the application demand callback.
   Only when there is no data, and fill interest is set, then the application demand callback must be notified.
** Removed action field to avoid lambda allocation.
** Now the application is called directly from the parse() method.
** Reading -1 from the network drives the parser by calling again parse(), rather than the parser directly.
  This allows to have a central place to notify the response success event.

* Fixed FastCGI similarly to HTTP/1.1.
* Removed leftover of the multiplex implementation.

* Fixed test flakyness in `NetworkTrafficListenerTest`: consume the request content before sending the response.

* Follow up after #10880: only abort the request if there is request content in `AuthenticationProtocolHandler` and `RedirectProtocolHandler`.
  This avoids the rare case where the response arrives before the request thread has modified the request state, even if the request has been fully sent over the network, causing the request to be failed even if it should not.

* added `SerializedInvoker` assertions about current thread invoking.
* Name all SerializedInvoker instances for better troubleshooting.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
sbordet and lorban authored Aug 30, 2024
1 parent fb82a44 commit 1726c87
Show file tree
Hide file tree
Showing 22 changed files with 388 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class HttpReceiver
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);

private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource;
Expand Down Expand Up @@ -332,21 +332,10 @@ protected void responseContentAvailable(HttpExchange exchange)
if (exchange.isResponseCompleteOrTerminated())
return;

responseContentAvailable();
contentSource.onDataAvailable();
});
}

/**
* Method to be invoked when response content is available to be read.
* <p>
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
protected void responseContentAvailable()
{
contentSource.onDataAvailable();
}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -720,6 +709,9 @@ public Content.Chunk read()

current = HttpReceiver.this.read(false);

if (LOG.isDebugEnabled())
LOG.debug("Read {} from {}", current, this);

try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
Expand All @@ -739,6 +731,7 @@ public void onDataAvailable()
{
if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this);
invoker.assertCurrentThreadInvoking();
// The onDataAvailable() method is only ever called
// by the invoker so avoid using the invoker again.
invokeDemandCallback(false);
Expand All @@ -763,6 +756,8 @@ private void processDemand()
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);

invoker.assertCurrentThreadInvoking();

Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
Expand Down Expand Up @@ -802,9 +797,14 @@ private void invokeDemandCallback(boolean invoke)
try
{
if (invoke)
{
invoker.run(demandCallback);
}
else
{
invoker.assertCurrentThreadInvoking();
demandCallback.run();
}
}
catch (Throwable x)
{
Expand Down
Loading

0 comments on commit 1726c87

Please sign in to comment.