Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use zero-byte reads in StreamCopier #1415

Merged
merged 5 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 135 additions & 104 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -18,142 +20,171 @@ internal static class StreamCopier
// Based on performance investigations, see https://github.com/microsoft/reverse-proxy/pull/330#issuecomment-758851852.
private const int DefaultBufferSize = 65536;

private static readonly TimeSpan TimeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

/// <inheritdoc/>
/// <remarks>
/// Based on <c>Microsoft.AspNetCore.Http.StreamCopyOperationInternal.CopyToAsync</c>.
/// See: <see href="https://github.com/dotnet/aspnetcore/blob/080660967b6043f731d4b7163af9e9e6047ef0c4/src/Http/Shared/StreamCopyOperationInternal.cs"/>.
/// </remarks>
public static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
{
_ = input ?? throw new ArgumentNullException(nameof(input));
_ = output ?? throw new ArgumentNullException(nameof(output));

var telemetryEnabled = ForwarderTelemetry.Log.IsEnabled();
Debug.Assert(input is not null);
Debug.Assert(output is not null);
Debug.Assert(clock is not null);
Debug.Assert(activityToken is not null);

var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
var reading = true;
// Avoid capturing 'isRequest' and 'clock' in the state machine when telemetry is disabled
var telemetry = ForwarderTelemetry.Log.IsEnabled(EventLevel.Informational, EventKeywords.All)
? new StreamCopierTelemetry(isRequest, clock)
: null;

long contentLength = 0;
long iops = 0;
var readTime = TimeSpan.Zero;
var writeTime = TimeSpan.Zero;
var firstReadTime = TimeSpan.FromMilliseconds(-1);
return CopyAsync(input, output, telemetry, activityToken, cancellation);
}

private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream input, Stream output, StreamCopierTelemetry? telemetry, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
{
var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting trade-off. This optimizes for the common case where there is data flowing, but would not get the zero-byte-read benefits until after the first read completes.

How does the perf compare to the alternative of always doing a zero-byte-read first?

Copy link
Member Author

@MihaZupan MihaZupan Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The differences are within the margin of error.

It may be worth just always doing the zero-byte read first to improve the worst-case memory consumption.

On both HttpClient and AspNetCore's side, there is room to special-case zero-byte reads and save a few branches (e.g. skip doing a no-op zero-byte slice+copy+advance on underlying buffers), especially if we know that zero-byte reads are this common (1:1 with regular reads). But for YARP this is in the noise range.

Copy link
Member

@Tratcher Tratcher Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've still rented the buffer in advance. You'd need to remove this line to take advantage of the zero-byte-read.

edit oh, if the zbr does not complete sync then you release the buffer. That's probably an adequate pattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may end up renting the buffer and releasing it right away if no data is available on the first read. The overhead of that is negligible (also shouldn't be the common case) and it simplifies the logic for all the other cases.

var read = 0;
try
{
var lastTime = TimeSpan.Zero;
var nextTransferringEvent = TimeSpan.Zero;

if (telemetryEnabled)
{
ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

lastTime = clock.GetStopwatchTime();
nextTransferringEvent = lastTime + TimeBetweenTransferringEvents;
}

while (true)
{
if (cancellation.IsCancellationRequested)
{
return (StreamCopyResult.Canceled, new OperationCanceledException(cancellation));
}
read = 0;

reading = true;
var read = 0;
try
// Issue a zero-byte read to the input stream to defer buffer allocation until data is available.
// Note that if the underlying stream does not supporting blocking on zero byte reads, then this will
// complete immediately and won't save any memory, but will still function correctly.
var zeroByteReadTask = input.ReadAsync(Memory<byte>.Empty, cancellation);
if (zeroByteReadTask.IsCompletedSuccessfully)
{
read = await input.ReadAsync(buffer.AsMemory(), cancellation);

// Success, reset the activity monitor.
activityToken.ResetTimeout();
// Consume the ValueTask's result in case it is backed by an IValueTaskSource
_ = zeroByteReadTask.Result;
}
finally
else
{
if (telemetryEnabled)
{
contentLength += read;
iops++;

var readStop = clock.GetStopwatchTime();
var currentReadTime = readStop - lastTime;
lastTime = readStop;
readTime += currentReadTime;
if (firstReadTime.Ticks < 0)
{
firstReadTime = currentReadTime;
}
}
// Take care not to return the same buffer to the pool twice in case zeroByteReadTask throws
var bufferToReturn = buffer;
buffer = null;
ArrayPool<byte>.Shared.Return(bufferToReturn);

await zeroByteReadTask;

buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
}

read = await input.ReadAsync(buffer.AsMemory(), cancellation);

telemetry?.AfterRead(read);

// Success, reset the activity monitor.
activityToken.ResetTimeout();

// End of the source stream.
if (read == 0)
{
return (StreamCopyResult.Success, null);
}

if (cancellation.IsCancellationRequested)
{
return (StreamCopyResult.Canceled, new OperationCanceledException(cancellation));
}
await output.WriteAsync(buffer.AsMemory(0, read), cancellation);

reading = false;
try
{
await output.WriteAsync(buffer.AsMemory(0, read), cancellation);
telemetry?.AfterWrite();

// Success, reset the activity monitor.
activityToken.ResetTimeout();
}
finally
{
if (telemetryEnabled)
{
var writeStop = clock.GetStopwatchTime();
writeTime += writeStop - lastTime;
lastTime = writeStop;
if (lastTime >= nextTransferringEvent)
{
ForwarderTelemetry.Log.ContentTransferring(
isRequest,
contentLength,
iops,
readTime.Ticks,
writeTime.Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
lastTime = clock.GetStopwatchTime();
nextTransferringEvent = lastTime + TimeBetweenTransferringEvents;
}
}
}
// Success, reset the activity monitor.
activityToken.ResetTimeout();
}
}
catch (OperationCanceledException oex)
{
return (StreamCopyResult.Canceled, oex);
}
catch (Exception ex)
{
return (reading ? StreamCopyResult.InputError : StreamCopyResult.OutputError, ex);
if (read == 0)
{
telemetry?.AfterRead(0);
}
else
{
telemetry?.AfterWrite();
}

var result = ex is OperationCanceledException ? StreamCopyResult.Canceled :
(read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError);

return (result, ex);
}
finally
{
// We can afford the perf impact of clearArray == true since we only do this twice per request.
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);
if (buffer is not null)
{
ArrayPool<byte>.Shared.Return(buffer);
}

telemetry?.Stop();
}
}

private sealed class StreamCopierTelemetry
{
private static readonly TimeSpan _timeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

private readonly bool _isRequest;
private readonly IClock _clock;
private long _contentLength;
private long _iops;
private TimeSpan _readTime;
private TimeSpan _writeTime;
private TimeSpan _firstReadTime;
private TimeSpan _lastTime;
private TimeSpan _nextTransferringEvent;

public StreamCopierTelemetry(bool isRequest, IClock clock)
{
_isRequest = isRequest;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_firstReadTime = new TimeSpan(-1);

if (telemetryEnabled)
ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

_lastTime = clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
}

public void AfterRead(int read)
{
_contentLength += read;
_iops++;

var readStop = _clock.GetStopwatchTime();
var currentReadTime = readStop - _lastTime;
_lastTime = readStop;
_readTime += currentReadTime;
if (_firstReadTime.Ticks < 0)
{
ForwarderTelemetry.Log.ContentTransferred(
isRequest,
contentLength,
iops,
readTime.Ticks,
writeTime.Ticks,
Math.Max(0, firstReadTime.Ticks));
_firstReadTime = currentReadTime;
}
}

public void AfterWrite()
{
var writeStop = _clock.GetStopwatchTime();
_writeTime += writeStop - _lastTime;
_lastTime = writeStop;

if (writeStop >= _nextTransferringEvent)
{
ForwarderTelemetry.Log.ContentTransferring(
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
_lastTime = _clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
}
}

public void Stop()
{
ForwarderTelemetry.Log.ContentTransferred(
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks,
Math.Max(0, _firstReadTime.Ticks));
}
}
}
16 changes: 12 additions & 4 deletions test/ReverseProxy.Tests/Forwarder/HttpForwarderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ public async Task RequestWithBody_KeptAliveByActivity()
httpContext.Request.Method = "POST";
httpContext.Request.Body = new CallbackReadStream(async (memory, ct) =>
{
if (reads >= expectedReads)
if (memory.Length == 0 || reads >= expectedReads)
{
return 0;
}
Expand Down Expand Up @@ -1501,7 +1501,7 @@ public async Task ResponseBodyCancelled_502()
Assert.Empty(httpContext.Response.Headers);
var errorFeature = httpContext.Features.Get<IForwarderErrorFeature>();
Assert.Equal(ForwarderError.ResponseBodyCanceled, errorFeature.Error);
Assert.IsType<OperationCanceledException>(errorFeature.Exception);
Assert.IsType<TaskCanceledException>(errorFeature.Exception);

AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error);
events.AssertContainProxyStages(hasRequestContent: false);
Expand Down Expand Up @@ -1542,7 +1542,7 @@ public async Task ResponseBodyCancelledAfterStart_Aborted()
Assert.Equal("bytes", httpContext.Response.Headers[HeaderNames.AcceptRanges]);
var errorFeature = httpContext.Features.Get<IForwarderErrorFeature>();
Assert.Equal(ForwarderError.ResponseBodyCanceled, errorFeature.Error);
Assert.IsType<OperationCanceledException>(errorFeature.Exception);
Assert.IsType<TaskCanceledException>(errorFeature.Exception);

AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error);
events.AssertContainProxyStages(hasRequestContent: false);
Expand Down Expand Up @@ -2299,6 +2299,11 @@ public ThrowStream(bool throwOnFirstRead = true)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.Length == 0)
{
return new ValueTask<int>(0);
}

cancellationToken.ThrowIfCancellationRequested();
if (_firstRead && !ThrowOnFirstRead)
{
Expand Down Expand Up @@ -2456,7 +2461,10 @@ public OnCompletedReadStream(Action onCompleted)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
OnCompleted();
if (buffer.Length != 0)
{
OnCompleted();
}
return new ValueTask<int>(0);
}
}
Expand Down
9 changes: 7 additions & 2 deletions test/ReverseProxy.Tests/Forwarder/StreamCopierTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public async Task Cancelled_Reported(bool isRequest)

using var cts = ActivityCancellationTokenSource.Rent(TimeSpan.FromSeconds(10), CancellationToken.None);
cts.Cancel();
var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, new Clock(), cts, cts.Token);
var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, new ManualClock(), cts, cts.Token);
Assert.Equal(StreamCopyResult.Canceled, result);
Assert.IsAssignableFrom<OperationCanceledException>(error);

AssertContentTransferred(events, isRequest,
contentLength: 0,
iops: 0,
iops: 1,
firstReadTime: TimeSpan.Zero,
readTime: TimeSpan.Zero,
writeTime: TimeSpan.Zero);
Expand Down Expand Up @@ -306,6 +306,11 @@ public SlowStream(Stream innerStream, ManualClock clock, TimeSpan waitTime)

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.Length == 0)
{
return new ValueTask<int>(0);
}

_clock.AdvanceClockBy(_waitTime);
return base.ReadAsync(buffer.Slice(0, Math.Min(buffer.Length, MaxBytesPerRead)), cancellationToken);
}
Expand Down
10 changes: 10 additions & 0 deletions test/ReverseProxy.Tests/Forwarder/StreamCopyHttpContentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public async Task SerializeToStreamAsync_RespectsContentCancellation()

var source = new ReadDelegatingStream(new MemoryStream(), async (buffer, cancellation) =>
{
if (buffer.Length == 0)
{
return 0;
}

Assert.False(cancellation.IsCancellationRequested);
await tcs.Task;
Assert.True(cancellation.IsCancellationRequested);
Expand All @@ -160,6 +165,11 @@ public async Task SerializeToStreamAsync_CanBeCanceledExternally()

var source = new ReadDelegatingStream(new MemoryStream(), async (buffer, cancellation) =>
{
if (buffer.Length == 0)
{
return 0;
}

Assert.False(cancellation.IsCancellationRequested);
await tcs.Task;
Assert.True(cancellation.IsCancellationRequested);
Expand Down