Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stack overflow in HttpStreamOverHttp3.consumeAvailable() #10562

Merged
merged 11 commits into from
Sep 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,14 @@ public boolean isCommitted()
@Override
public Throwable consumeAvailable()
{
return HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
Throwable result = HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
if (result != null)
{
if (_chunk != null)
_chunk.release();
_chunk = Content.Chunk.from(result, true);
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,15 @@ public Throwable consumeAvailable()
{
if (tunnelSupport != null)
return null;
return HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
Throwable result = HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
if (result != null)
{
_trailer = null;
if (_chunk != null)
_chunk.release();
_chunk = Content.Chunk.from(result, true);
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,14 @@ public Throwable consumeAvailable()
{
if (getTunnelSupport() != null)
return null;
return HttpStream.consumeAvailable(this, httpChannel.getConnectionMetaData().getHttpConfiguration());
Throwable result = HttpStream.consumeAvailable(this, httpChannel.getConnectionMetaData().getHttpConfiguration());
if (result != null)
{
if (chunk != null)
chunk.release();
chunk = Content.Chunk.from(result, true);
}
return result;
}

public boolean isIdle()
Expand Down Expand Up @@ -528,6 +535,12 @@ public void onIdleTimeout(TimeoutException failure, BiConsumer<Runnable, Boolean

public Runnable onFailure(Throwable failure)
{
try (AutoLock ignored = lock.lock())
{
if (chunk != null)
chunk.release();
chunk = Content.Chunk.from(failure, true);
}
lorban marked this conversation as resolved.
Show resolved Hide resolved
return httpChannel.onFailure(failure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,39 +572,49 @@ private int fillRequestBuffer()
_retainableByteBuffer = newBuffer;
}

if (isRequestBufferEmpty())
{
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
ByteBuffer requestBuffer = getRequestBuffer();
if (!isRequestBufferEmpty())
return _retainableByteBuffer.remaining();

// fill
try
{
int filled = getEndPoint().fill(requestBuffer);
if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(requestBuffer);
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not any possible legal threads calling #parseContent or #completed.
ByteBuffer requestBuffer = getRequestBuffer();

if (filled > 0)
bytesIn.add(filled);
else if (filled < 0)
_parser.atEOF();
// fill
try
{
int filled = getEndPoint().fill(requestBuffer);
if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(requestBuffer);

if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer);

return filled;
if (filled > 0)
{
bytesIn.add(filled);
}
catch (IOException e)
else
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), e);
_parser.atEOF();
return -1;
if (filled < 0)
_parser.atEOF();
releaseRequestBuffer();
}

return filled;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), x);
_parser.atEOF();
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.clear();
releaseRequestBuffer();
}
return -1;
}
return 0;
}

private boolean parseRequestBuffer()
Expand Down Expand Up @@ -1147,15 +1157,9 @@ public Throwable consumeAvailable()
if (result != null)
{
_generator.setPersistent(false);
// If HttpStream.consumeAvailable() returns an error, there may be unconsumed content left,
// so we must make sure the buffer is released and that the next chunk indicates the end of the stream.
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.release();
_retainableByteBuffer = null;
}
if (_chunk == null)
_chunk = Content.Chunk.from(result, true);
if (_chunk != null)
_chunk.release();
_chunk = Content.Chunk.from(result, true);
}
lorban marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static boolean isLeakTrackingDisabled(TestInfo testInfo, String tagSubVa
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue);
if (disabled)
{
System.err.println("Not tracking leaks");
System.err.println("Not tracking " + tagSubValue + " leaks");
return true;
}

Expand All @@ -172,7 +172,7 @@ private static boolean isLeakTrackingDisabled(TestInfo testInfo, String tagSubVa
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + transportName);
if (disabled)
{
System.err.println("Not tracking leaks for transport " + transportName);
System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName);
return true;
}
}
Expand All @@ -181,7 +181,7 @@ private static boolean isLeakTrackingDisabled(TestInfo testInfo, String tagSubVa
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue);
if (disabled)
{
System.err.println("Not tracking leaks for " + tagSubValue);
System.err.println("Not tracking " + tagSubValue + " leaks");
return true;
}

Expand All @@ -191,7 +191,7 @@ private static boolean isLeakTrackingDisabled(TestInfo testInfo, String tagSubVa
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue + ":" + transportName);
if (disabled)
{
System.err.println("Not tracking leaks for " + tagSubValue + " using transport " + transportName);
System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@

import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.ByteBufferRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.InputStreamRequestContent;
import org.eclipse.jetty.client.InputStreamResponseListener;
Expand All @@ -56,6 +58,7 @@
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CompletableTask;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -73,6 +76,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -1193,6 +1197,92 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons
assertTrue(clientLatch.await(timeoutInSeconds, TimeUnit.SECONDS));
}

@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:server:H2")
@Tag("DisableLeakTracking:server:H2C")
@Tag("DisableLeakTracking:server:H3")
@Tag("DisableLeakTracking:server:FCGI")
public void testHttpStreamConsumeAvailableUponClientAbort(Transport transport) throws Exception
{
AtomicReference<org.eclipse.jetty.client.Request> clientRequestRef = new AtomicReference<>();

start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
new CompletableTask<>()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
completeExceptionally(chunk.getFailure());
return;
}
chunk.release();
if (chunk.isLast())
{
complete(null);
return;
}

var r = clientRequestRef.getAndSet(null);
if (r != null)
{
// Abort the client request then give some time for the client's
// abort notification (e.g.: reset frame) to reach the server.
r.abort(new IllegalCallerException());
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
completeExceptionally(e);
return;
}
}
}
}
}
.start()
.whenComplete((result, failure) ->
{
if (failure == null)
callback.succeeded();
else
callback.failed(failure);
});
return true;
}
});

byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data));

var request = client.newRequest(newURI(transport))
.body(content);
clientRequestRef.set(request);
Throwable throwable = new CompletableResponseListener(request)
.send()
.handle((r, t) -> t)
.get(5, TimeUnit.SECONDS);

assertInstanceOf(IllegalCallerException.class, throwable);
}

private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
}
Expand Down