Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes HttpClient Content.Source reads from arbitrary threads #12203

Merged
merged 8 commits into from
Aug 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class HttpReceiver
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);

private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource;
Expand Down Expand Up @@ -332,21 +332,10 @@ protected void responseContentAvailable(HttpExchange exchange)
if (exchange.isResponseCompleteOrTerminated())
return;

responseContentAvailable();
contentSource.onDataAvailable();
});
}

/**
* Method to be invoked when response content is available to be read.
* <p>
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
protected void responseContentAvailable()
{
contentSource.onDataAvailable();
}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -720,6 +709,9 @@ public Content.Chunk read()

current = HttpReceiver.this.read(false);

if (LOG.isDebugEnabled())
LOG.debug("Read {} from {}", current, this);

try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
Expand All @@ -739,6 +731,7 @@ public void onDataAvailable()
{
if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this);
invoker.assertCurrentThreadInvoking();
// The onDataAvailable() method is only ever called
// by the invoker so avoid using the invoker again.
invokeDemandCallback(false);
Expand All @@ -763,6 +756,8 @@ private void processDemand()
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);

invoker.assertCurrentThreadInvoking();

Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
Expand Down Expand Up @@ -802,9 +797,14 @@ private void invokeDemandCallback(boolean invoke)
try
{
if (invoke)
{
invoker.run(demandCallback);
}
else
{
invoker.assertCurrentThreadInvoking();
demandCallback.run();
}
}
catch (Throwable x)
{
Expand Down
Loading
Loading