Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ public void Dispose() { }
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Memory<byte> GetMemory(int sizeHint = 0) { throw null; }
public System.Span<byte> GetSpan(int sizeHint = 0) { throw null; }
public void Reset() { }
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> Write100ContinueAsync() { throw null; }
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteChunkAsync(System.ReadOnlySpan<byte> buffer, System.Threading.CancellationToken cancellationToken) { throw null; }
public System.Threading.Tasks.Task WriteDataAsync(System.ReadOnlySpan<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -954,6 +955,7 @@ public partial interface IHttpOutputProducer
System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken);
System.Memory<byte> GetMemory(int sizeHint = 0);
System.Span<byte> GetSpan(int sizeHint = 0);
void Reset();
System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> Write100ContinueAsync();
System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteChunkAsync(System.ReadOnlySpan<byte> data, System.Threading.CancellationToken cancellationToken);
System.Threading.Tasks.Task WriteDataAsync(System.ReadOnlySpan<byte> data, System.Threading.CancellationToken cancellationToken);
Expand Down Expand Up @@ -1297,6 +1299,7 @@ public void Dispose() { }
public System.Span<byte> GetSpan(int sizeHint = 0) { throw null; }
void Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.IHttpOutputAborter.Abort(Microsoft.AspNetCore.Connections.ConnectionAbortedException abortReason) { }
System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.IHttpOutputProducer.WriteChunkAsync(System.ReadOnlySpan<byte> data, System.Threading.CancellationToken cancellationToken) { throw null; }
public void Reset() { }
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> Write100ContinueAsync() { throw null; }
public System.Threading.Tasks.Task WriteChunkAsync(System.ReadOnlySpan<byte> span, System.Threading.CancellationToken cancellationToken) { throw null; }
public System.Threading.Tasks.Task WriteDataAsync(System.ReadOnlySpan<byte> data, System.Threading.CancellationToken cancellationToken) { throw null; }
Expand Down
213 changes: 199 additions & 14 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading;
Expand All @@ -26,6 +27,9 @@ public class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IDis
// "0\r\n\r\n"
private static ReadOnlySpan<byte> EndChunkedResponseBytes => new byte[] { (byte)'0', (byte)'\r', (byte)'\n', (byte)'\r', (byte)'\n' };

private const int BeginChunkLengthMax = 5;
private const int EndChunkLength = 2;

private readonly string _connectionId;
private readonly ConnectionContext _connectionContext;
private readonly IKestrelTrace _log;
Expand All @@ -40,21 +44,28 @@ public class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IDis
private bool _completed;
private bool _aborted;
private long _unflushedBytes;
private bool _autoChunk;

private readonly PipeWriter _pipeWriter;
private const int MemorySizeThreshold = 1024;
private const int BeginChunkLengthMax = 5;
private const int EndChunkLength = 2;
private IMemoryOwner<byte> _fakeMemoryOwner;

// Chunked responses need to be treated uniquely when using GetMemory + Advance.
// We need to know the size of the data written to the chunk before calling Advance on the
// PipeWriter, meaning we internally track how far we have advanced through a current chunk (_advancedBytesForChunk).
// Once write or flush is called, we modify the _currentChunkMemory to prepend the size of data written
// and append the end terminator.

private bool _autoChunk;
private int _advancedBytesForChunk;
private Memory<byte> _currentChunkMemory;
private bool _currentChunkMemoryUpdated;
private IMemoryOwner<byte> _fakeMemoryOwner;

// Fields needed to store writes before calling either startAsync or Write/FlushAsync
// These should be cleared by the end of the request
private List<CompletedBuffer> _completedSegments;
private Memory<byte> _currentSegment;
private IMemoryOwner<byte> _currentSegmentOwner;
private int _position;
private bool _startCalled;

public Http1OutputProducer(
PipeWriter pipeWriter,
Expand Down Expand Up @@ -158,6 +169,10 @@ public Memory<byte> GetMemory(int sizeHint = 0)
{
return GetFakeMemory(sizeHint);
}
else if (!_startCalled)
{
return LeasedMemory(sizeHint);
}
else if (_autoChunk)
{
return GetChunkedMemory(sizeHint);
Expand All @@ -177,6 +192,10 @@ public Span<byte> GetSpan(int sizeHint = 0)
{
return GetFakeMemory(sizeHint).Span;
}
else if (!_startCalled)
{
return LeasedMemory(sizeHint).Span;
}
else if (_autoChunk)
{
return GetChunkedMemory(sizeHint).Span;
Expand All @@ -197,16 +216,23 @@ public void Advance(int bytes)
return;
}

if (_autoChunk)
if (!_startCalled)
{
if (bytes < 0)
if (bytes >= 0)
{
throw new ArgumentOutOfRangeException(nameof(bytes));
}
if (_currentSegment.Length - bytes < _position)
{
throw new ArgumentOutOfRangeException("Can't advance past buffer size.");
}

if (bytes + _advancedBytesForChunk > _currentChunkMemory.Length - BeginChunkLengthMax - EndChunkLength)
_position += bytes;
}
}
else if (_autoChunk)
{
if (_advancedBytesForChunk > _currentChunkMemory.Length - BeginChunkLengthMax - EndChunkLength - bytes)
{
throw new InvalidOperationException("Can't advance past buffer size.");
throw new ArgumentOutOfRangeException("Can't advance past buffer size.");
}
_advancedBytesForChunk += bytes;
}
Expand Down Expand Up @@ -238,6 +264,7 @@ public ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> buffer, Cancell
{
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
CommitChunkInternal(ref writer, buffer);
_unflushedBytes += writer.BytesCommitted;
}
}

Expand All @@ -260,7 +287,6 @@ private void CommitChunkInternal(ref BufferWriter<PipeWriter> writer, ReadOnlySp
}

writer.Commit();
_unflushedBytes += writer.BytesCommitted;
}

public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk)
Expand Down Expand Up @@ -288,8 +314,52 @@ private void WriteResponseHeadersInternal(ref BufferWriter<PipeWriter> writer, i

writer.Commit();

_unflushedBytes += writer.BytesCommitted;
_autoChunk = autoChunk;
WriteDataWrittenBeforeHeaders(ref writer);
_unflushedBytes += writer.BytesCommitted;

_startCalled = true;
}

private void WriteDataWrittenBeforeHeaders(ref BufferWriter<PipeWriter> writer)
{
if (_completedSegments != null)
{
foreach (var segment in _completedSegments)
{
if (_autoChunk)
{
CommitChunkInternal(ref writer, segment.Span);
}
else
{
writer.Write(segment.Span);
writer.Commit();
}
segment.Return();
}

_completedSegments.Clear();
}

if (!_currentSegment.IsEmpty)
{
var segment = _currentSegment.Slice(0, _position);

if (_autoChunk)
{
CommitChunkInternal(ref writer, segment.Span);
}
else
{
writer.Write(segment.Span);
writer.Commit();
}

_position = 0;

DisposeCurrentSegment();
}
}

public void Dispose()
Expand All @@ -302,10 +372,28 @@ public void Dispose()
_fakeMemoryOwner = null;
}

// Call dispose on any memory that wasn't written.
if (_completedSegments != null)
{
foreach (var segment in _completedSegments)
{
segment.Return();
}
}

DisposeCurrentSegment();

CompletePipe();
}
}

private void DisposeCurrentSegment()
{
_currentSegmentOwner?.Dispose();
_currentSegmentOwner = null;
_currentSegment = default;
}

private void CompletePipe()
{
if (!_pipeWriterCompleted)
Expand Down Expand Up @@ -382,10 +470,21 @@ public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string reas

CommitChunkInternal(ref writer, buffer);

_unflushedBytes += writer.BytesCommitted;

return FlushAsync(cancellationToken);
}
}

public void Reset()
{
Debug.Assert(_currentSegmentOwner == null);
Debug.Assert(_completedSegments == null || _completedSegments.Count == 0);
_autoChunk = false;
_startCalled = false;
_currentChunkMemoryUpdated = false;
}

private ValueTask<FlushResult> WriteAsync(
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -454,7 +553,7 @@ private Memory<byte> GetChunkedMemory(int sizeHint)
}

var memoryMaxLength = _currentChunkMemory.Length - BeginChunkLengthMax - EndChunkLength;
if (_advancedBytesForChunk >= memoryMaxLength - Math.Min(MemorySizeThreshold, sizeHint))
if (_advancedBytesForChunk >= memoryMaxLength - sizeHint && _advancedBytesForChunk > 0)
{
// Chunk is completely written, commit it to the pipe so GetMemory will return a new chunk of memory.
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
Expand Down Expand Up @@ -506,5 +605,91 @@ private Memory<byte> GetFakeMemory(int sizeHint)
}
return _fakeMemoryOwner.Memory;
}

private Memory<byte> LeasedMemory(int sizeHint)
{
EnsureCapacity(sizeHint);
return _currentSegment.Slice(_position);
}

private void EnsureCapacity(int sizeHint)
{
// Only subtracts _position from the current segment length if it's non-null.
// If _currentSegment is null, it returns 0.
var remainingSize = _currentSegment.Length - _position;

// If the sizeHint is 0, any capacity will do
// Otherwise, the buffer must have enough space for the entire size hint, or we need to add a segment.
if ((sizeHint == 0 && remainingSize > 0) || (sizeHint > 0 && remainingSize >= sizeHint))
{
// We have capacity in the current segment
return;
}

AddSegment(sizeHint);
}

private void AddSegment(int sizeHint = 0)
{
if (_currentSegment.Length != 0)
{
// We're adding a segment to the list
if (_completedSegments == null)
{
_completedSegments = new List<CompletedBuffer>();
}

// Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
// ignore any empty space in it.
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
}

if (sizeHint <= _memoryPool.MaxBufferSize)
{
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
// Also, the size cannot be larger than the MaxBufferSize of the MemoryPool
var owner = _memoryPool.Rent(Math.Min(sizeHint, _memoryPool.MaxBufferSize));
_currentSegment = owner.Memory;
_currentSegmentOwner = owner;
}
else
{
_currentSegment = new byte[sizeHint];
_currentSegmentOwner = null;
}

_position = 0;
}


/// <summary>
/// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it.
/// </summary>
private readonly struct CompletedBuffer
{
private readonly IMemoryOwner<byte> _memoryOwner;

public Memory<byte> Buffer { get; }
public int Length { get; }

public ReadOnlySpan<byte> Span => Buffer.Span.Slice(0, Length);

public CompletedBuffer(IMemoryOwner<byte> owner, Memory<byte> buffer, int length)
{
_memoryOwner = owner;

Buffer = buffer;
Length = length;
}

public void Return()
{
if (_memoryOwner != null)
{
_memoryOwner.Dispose();
}
}
}
}
}
Loading