Skip to content

Commit

Permalink
Improve HTTP/1 response header parsing (#74393)
Browse files Browse the repository at this point in the history
* Improve HTTP/1 response header parsing

* Improve worst-case performance

* Account for line folds in FillForHeadersAsync

* PR feedback

* Extend header trickle test for line folds

* RIP goto

* Clarify the meaning of 'valueIter'

* Handle Scavenge zero-byte reads in test
  • Loading branch information
MihaZupan authored Oct 13, 2022
1 parent 15aeb77 commit d2d5ad3
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace System.IO
/// <summary>Provides a stream whose implementation is supplied by delegates or by an inner stream.</summary>
internal sealed class DelegateDelegatingStream : DelegatingStream
{
public delegate int ReadSpanDelegate(Span<byte> buffer);

public static DelegateDelegatingStream NopDispose(Stream innerStream) =>
new DelegateDelegatingStream(innerStream)
{
Expand All @@ -27,6 +29,7 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { }
public Func<long> GetPositionFunc { get; set; }
public Action<long> SetPositionFunc { get; set; }
public Func<byte[], int, int, int> ReadFunc { get; set; }
public ReadSpanDelegate ReadSpanFunc { get; set; }
public Func<byte[], int, int, CancellationToken, Task<int>> ReadAsyncArrayFunc { get; set; }
public Func<Memory<byte>, CancellationToken, ValueTask<int>> ReadAsyncMemoryFunc { get; set; }
public Func<long, SeekOrigin, long> SeekFunc { get; set; }
Expand All @@ -48,6 +51,7 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { }
public override long Position => GetPositionFunc != null ? GetPositionFunc() : base.Position;

public override int Read(byte[] buffer, int offset, int count) => ReadFunc != null ? ReadFunc(buffer, offset, count) : base.Read(buffer, offset, count);
public override int Read(Span<byte> buffer) => ReadSpanFunc != null ? ReadSpanFunc(buffer) : base.Read(buffer);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => ReadAsyncArrayFunc != null ? ReadAsyncArrayFunc(buffer, offset, count, cancellationToken) : base.ReadAsync(buffer, offset, count, cancellationToken);
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => ReadAsyncMemoryFunc != null ? ReadAsyncMemoryFunc(buffer, cancellationToken) : base.ReadAsync(buffer, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public override int Read(Span<byte> buffer)
}

// We're only here if we need more data to make forward progress.
_connection.Fill();
Fill();

// Now that we have more, see if we can get any response data, and if
// we can we're done.
Expand Down Expand Up @@ -210,7 +210,7 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
}

// We're only here if we need more data to make forward progress.
await _connection.FillAsync(async: true).ConfigureAwait(false);
await FillAsync().ConfigureAwait(false);

// Now that we have more, see if we can get any response data, and if
// we can we're done.
Expand Down Expand Up @@ -273,7 +273,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
return;
}

await _connection.FillAsync(async: true).ConfigureAwait(false);
await FillAsync().ConfigureAwait(false);
}
}
catch (Exception exc) when (CancellationHelper.ShouldWrapInOperationCanceledException(exc, cancellationToken))
Expand Down Expand Up @@ -323,7 +323,7 @@ private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenR
Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");

// Read the chunk header line.
if (!_connection.TryReadNextChunkedLine(readingHeader: false, out currentLine))
if (!_connection.TryReadNextChunkedLine(out currentLine))
{
// Could not get a whole line, so we can't parse the chunk header.
return default;
Expand Down Expand Up @@ -379,7 +379,7 @@ private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenR
case ParsingState.ExpectChunkTerminator:
Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");

if (!_connection.TryReadNextChunkedLine(readingHeader: false, out currentLine))
if (!_connection.TryReadNextChunkedLine(out currentLine))
{
return default;
}
Expand All @@ -395,38 +395,23 @@ private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenR
case ParsingState.ConsumeTrailers:
Debug.Assert(_chunkBytesRemaining == 0, $"Expected {nameof(_chunkBytesRemaining)} == 0, got {_chunkBytesRemaining}");

while (true)
// Consume the receive buffer. If the stream is disposed, pass a null response to avoid
// processing headers for a connection returned to the pool.
if (_connection.ParseHeaders(IsDisposed ? null : _response, isFromTrailer: true))
{
if (!_connection.TryReadNextChunkedLine(readingHeader: true, out currentLine))
{
break;
}

if (currentLine.IsEmpty)
{
// Dispose of the registration and then check whether cancellation has been
// requested. This is necessary to make deterministic a race condition between
// cancellation being requested and unregistering from the token. Otherwise,
// it's possible cancellation could be requested just before we unregister and
// we then return a connection to the pool that has been or will be disposed
// (e.g. if a timer is used and has already queued its callback but the
// callback hasn't yet run).
cancellationRegistration.Dispose();
CancellationHelper.ThrowIfCancellationRequested(cancellationRegistration.Token);

_state = ParsingState.Done;
_connection.CompleteResponse();
_connection = null;

break;
}
// Parse the trailer.
else if (!IsDisposed)
{
// Make sure that we don't inadvertently consume trailing headers
// while draining a connection that's being returned back to the pool.
HttpConnection.ParseHeaderNameValue(_connection, currentLine, _response, isFromTrailer: true);
}
// Dispose of the registration and then check whether cancellation has been
// requested. This is necessary to make deterministic a race condition between
// cancellation being requested and unregistering from the token. Otherwise,
// it's possible cancellation could be requested just before we unregister and
// we then return a connection to the pool that has been or will be disposed
// (e.g. if a timer is used and has already queued its callback but the
// callback hasn't yet run).
cancellationRegistration.Dispose();
CancellationHelper.ThrowIfCancellationRequested(cancellationRegistration.Token);

_state = ParsingState.Done;
_connection.CompleteResponse();
_connection = null;
}

return default;
Expand Down Expand Up @@ -528,7 +513,7 @@ public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
}
}

await _connection.FillAsync(async: true).ConfigureAwait(false);
await FillAsync().ConfigureAwait(false);
}
}
finally
Expand All @@ -537,6 +522,24 @@ public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
cts?.Dispose();
}
}

private void Fill()
{
Debug.Assert(_connection is not null);
ValueTask fillTask = _state == ParsingState.ConsumeTrailers
? _connection.FillForHeadersAsync(async: false)
: _connection.FillAsync(async: false);
Debug.Assert(fillTask.IsCompleted);
fillTask.GetAwaiter().GetResult();
}

private ValueTask FillAsync()
{
Debug.Assert(_connection is not null);
return _state == ParsingState.ConsumeTrailers
? _connection.FillForHeadersAsync(async: true)
: _connection.FillAsync(async: true);
}
}
}
}
Loading

0 comments on commit d2d5ad3

Please sign in to comment.