Skip to content
53 changes: 44 additions & 9 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ internal class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, ID
private readonly object _contextLock = new object();

private bool _pipeWriterCompleted;
private bool _completed;
private bool _aborted;
private long _unflushedBytes;
private int _currentMemoryPrefixBytes;
Expand All @@ -56,6 +55,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, ID
// and append the end terminator.

private bool _autoChunk;
private bool _suffixSent;
private int _advancedBytesForChunk;
private Memory<byte> _currentChunkMemory;
private bool _currentChunkMemoryUpdated;
Expand Down Expand Up @@ -111,7 +111,18 @@ public ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> buffer, Ca

public ValueTask<FlushResult> WriteStreamSuffixAsync()
{
return WriteAsync(EndChunkedResponseBytes);
lock (_contextLock)
{
if (_suffixSent || !_autoChunk)
{
_suffixSent = true;
return FlushAsync();
}

_suffixSent = true;
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
return WriteAsyncInternal(ref writer, EndChunkedResponseBytes);
}
}

public ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -146,7 +157,7 @@ public ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = d

ValueTask<FlushResult> FlushAsyncChunked(Http1OutputProducer producer, CancellationToken token)
{
// Local function so in the common-path the stack space for BufferWriter isn't reservered and cleared when it isn't used.
// Local function so in the common-path the stack space for BufferWriter isn't reserved and cleared when it isn't used.

Debug.Assert(!producer._pipeWriterCompleted);
Debug.Assert(producer._autoChunk && producer._advancedBytesForChunk > 0);
Expand All @@ -169,7 +180,9 @@ public Memory<byte> GetMemory(int sizeHint = 0)
{
lock (_contextLock)
{
if (_completed)
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return GetFakeMemory(sizeHint);
}
Expand All @@ -192,7 +205,9 @@ public Span<byte> GetSpan(int sizeHint = 0)
{
lock (_contextLock)
{
if (_completed)
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return GetFakeMemory(sizeHint).Span;
}
Expand All @@ -215,7 +230,9 @@ public void Advance(int bytes)
{
lock (_contextLock)
{
if (_completed)
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return;
}
Expand Down Expand Up @@ -257,6 +274,8 @@ public ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> buffer, Cancell
{
lock (_contextLock)
{
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return default;
Expand Down Expand Up @@ -297,6 +316,8 @@ public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpRespon
{
lock (_contextLock)
{
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return;
Expand Down Expand Up @@ -404,7 +425,6 @@ private void CompletePipe()
{
_log.ConnectionDisconnect(_connectionId);
_pipeWriterCompleted = true;
_completed = true;
}
}

Expand All @@ -426,11 +446,11 @@ public void Abort(ConnectionAbortedException error)
}
}

public void Complete()
public void Stop()
{
lock (_contextLock)
{
_completed = true;
CompletePipe();
}
}

Expand All @@ -443,6 +463,8 @@ public ValueTask<FlushResult> FirstWriteAsync(int statusCode, string reasonPhras
{
lock (_contextLock)
{
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return default;
Expand All @@ -461,6 +483,8 @@ public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string reas
{
lock (_contextLock)
{
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return default;
Expand All @@ -486,6 +510,7 @@ public void Reset()
// Cleared in sequential address ascending order
_currentMemoryPrefixBytes = 0;
_autoChunk = false;
_suffixSent = false;
_currentChunkMemoryUpdated = false;
_startCalled = false;
}
Expand All @@ -496,6 +521,8 @@ private ValueTask<FlushResult> WriteAsync(
{
lock (_contextLock)
{
ThrowIfSuffixSent();

if (_pipeWriterCompleted)
{
return default;
Expand Down Expand Up @@ -671,6 +698,14 @@ private void AddSegment(int sizeHint = 0)
_position = 0;
}

[StackTraceHidden]
private void ThrowIfSuffixSent()
{
if (_suffixSent)
{
throw new InvalidOperationException("Writing is not allowed after writer was completed.");
}
}

/// <summary>
/// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async Task<Stream> IHttpUpgradeFeature.UpgradeAsync()

await FlushAsync();

return bodyControl.Upgrade();
return _bodyControl.Upgrade();
}

void IHttpRequestLifetimeFeature.Abort()
Expand Down
105 changes: 76 additions & 29 deletions src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal abstract partial class HttpProtocol : IDefaultHttpContextContainer, IHt
private static readonly byte[] _bytesTransferEncodingChunked = Encoding.ASCII.GetBytes("\r\nTransfer-Encoding: chunked");
private static readonly byte[] _bytesServer = Encoding.ASCII.GetBytes("\r\nServer: " + Constants.ServerName);

protected BodyControl bodyControl;
protected BodyControl _bodyControl;
private Stack<KeyValuePair<Func<object, Task>, object>> _onStarting;
private Stack<KeyValuePair<Func<object, Task>, object>> _onCompleted;

Expand Down Expand Up @@ -315,18 +315,16 @@ DefaultHttpContext IDefaultHttpContextContainer.HttpContext

public void InitializeBodyControl(MessageBody messageBody)
{
if (bodyControl == null)
if (_bodyControl == null)
{
bodyControl = new BodyControl(bodyControl: this, this);
_bodyControl = new BodyControl(bodyControl: this, this);
}

(RequestBody, ResponseBody, RequestBodyPipeReader, ResponseBodyPipeWriter) = bodyControl.Start(messageBody);
(RequestBody, ResponseBody, RequestBodyPipeReader, ResponseBodyPipeWriter) = _bodyControl.Start(messageBody);
_requestStreamInternal = RequestBody;
_responseStreamInternal = ResponseBody;
}

public void StopBodies() => bodyControl.Stop();

// For testing
internal void ResetState()
{
Expand Down Expand Up @@ -497,7 +495,7 @@ protected void AbortRequest()

protected void PoisonRequestBodyStream(Exception abortReason)
{
bodyControl?.Abort(abortReason);
_bodyControl?.Abort(abortReason);
}

// Prevents the RequestAborted token from firing for the duration of the request.
Expand Down Expand Up @@ -666,7 +664,17 @@ private async Task ProcessRequests<TContext>(IHttpApplication<TContext> applicat

// At this point all user code that needs use to the request or response streams has completed.
// Using these streams in the OnCompleted callback is not allowed.
StopBodies();
try
{
await _bodyControl.StopAsync();
}
catch (Exception ex)
{
// BodyControl.StopAsync() can throw if the PipeWriter was completed prior to the application writing
// enough bytes to satisfy the specified Content-Length. This risks double-logging the exception,
// but this scenario generally indicates an app bug, so I don't want to risk not logging it.
ReportApplicationError(ex);
}

// 4XX responses are written by TryProduceInvalidRequestResponse during connection tear down.
if (_requestRejectedException == null)
Expand Down Expand Up @@ -1019,6 +1027,11 @@ protected Task TryProduceInvalidRequestResponse()

protected Task ProduceEnd()
{
if (HasResponseCompleted)
{
return Task.CompletedTask;
}

if (_requestRejectedException != null || _applicationException != null)
{
if (HasResponseStarted)
Expand Down Expand Up @@ -1052,18 +1065,21 @@ protected Task ProduceEnd()

private Task WriteSuffix()
{
if (HasResponseCompleted)
if (_autoChunk || _httpVersion == Http.HttpVersion.Http2)
{
return Task.CompletedTask;
// For the same reason we call CheckLastWrite() in Content-Length responses.
PreventRequestAbortedCancellation();
}

// _autoChunk should be checked after we are sure ProduceStart() has been called
// since ProduceStart() may set _autoChunk to true.
if (_autoChunk || _httpVersion == Http.HttpVersion.Http2)
var writeTask = Output.WriteStreamSuffixAsync();

if (!writeTask.IsCompletedSuccessfully)
{
return WriteSuffixAwaited();
return WriteSuffixAwaited(writeTask);
}

_requestProcessingStatus = RequestProcessingStatus.ResponseCompleted;

if (_keepAlive)
{
Log.ConnectionKeepAlive(ConnectionId);
Expand All @@ -1074,23 +1090,14 @@ private Task WriteSuffix()
Log.ConnectionHeadResponseBodyWrite(ConnectionId, _responseBytesWritten);
}

if (!HasFlushedHeaders)
{
_requestProcessingStatus = RequestProcessingStatus.ResponseCompleted;
return FlushAsyncInternal();
}

return Task.CompletedTask;
}

private async Task WriteSuffixAwaited()
private async Task WriteSuffixAwaited(ValueTask<FlushResult> writeTask)
{
// For the same reason we call CheckLastWrite() in Content-Length responses.
PreventRequestAbortedCancellation();

_requestProcessingStatus = RequestProcessingStatus.HeadersFlushed;

await Output.WriteStreamSuffixAsync();
await writeTask;

_requestProcessingStatus = RequestProcessingStatus.ResponseCompleted;

Expand Down Expand Up @@ -1405,11 +1412,11 @@ public void CancelPendingFlush()
Output.CancelPendingFlush();
}

public void Complete(Exception ex)
public Task CompleteAsync(Exception exception = null)
{
if (ex != null)
if (exception != null)
{
var wrappedException = new ConnectionAbortedException("The BodyPipe was completed with an exception.", ex);
var wrappedException = new ConnectionAbortedException("The BodyPipe was completed with an exception.", exception);
ReportApplicationError(wrappedException);

if (HasResponseStarted)
Expand All @@ -1418,7 +1425,47 @@ public void Complete(Exception ex)
}
}

Output.Complete();
// Finalize headers
if (!HasResponseStarted)
{
var onStartingTask = FireOnStarting();
if (!onStartingTask.IsCompletedSuccessfully)
{
return CompleteAsyncAwaited(onStartingTask);
}
}

// Flush headers, body, trailers...
if (!HasResponseCompleted)
{
if (!VerifyResponseContentLength(out var lengthException))
{
// Try to throw this exception from CompleteAsync() instead of CompleteAsyncAwaited() if possible,
// so it can be observed by BodyWriter.Complete(). If this isn't possible because an
// async OnStarting callback hadn't yet run, it's OK, since the Exception will be observed with
// the call to _bodyControl.StopAsync() in ProcessRequests().
throw lengthException;
}

return ProduceEnd();
}

return Task.CompletedTask;
}

private async Task CompleteAsyncAwaited(Task onStartingTask)
{
await onStartingTask;

if (!HasResponseCompleted)
{
if (!VerifyResponseContentLength(out var lengthException))
{
throw lengthException;
}

await ProduceEnd();
}
}

public ValueTask<FlushResult> WritePipeAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
Expand Down
Loading