From 1b02b8549618c0edd7022362617251d9d1c33152 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 16 Apr 2022 09:17:10 -0700 Subject: [PATCH] HTTP/2 output processing make over (#40925) This changes the HTTP/2's output processing to be use queues instead of locks (a channel to be precise). We're manually scheduling the Http2OutputProducer's intent to write instead of a write operation. This removes locks and allows the thread to do other useful work while processing in line for processing in a low allocation way. --- .../Http2/FlowControl/AwaitableProvider.cs | 94 --- .../Http2/FlowControl/OutputFlowControl.cs | 74 --- .../FlowControl/StreamOutputFlowControl.cs | 97 --- .../src/Internal/Http2/Http2Connection.cs | 67 +- .../src/Internal/Http2/Http2FrameWriter.cs | 593 +++++++++++------- .../src/Internal/Http2/Http2OutputProducer.cs | 456 ++++++++++---- .../Core/src/Internal/Http2/Http2Stream.cs | 14 +- .../src/Internal/Http2/Http2StreamContext.cs | 6 +- .../Internal/Infrastructure/KestrelTrace.cs | 32 + .../Core/test/Http2/Http2FrameWriterTests.cs | 2 +- .../Core/test/PooledStreamStackTests.cs | 3 +- .../Http2/Http2FrameWriterBenchmark.cs | 19 +- .../Kestrel/shared/test/TestContextFactory.cs | 4 +- .../Http2/Http2ConnectionTests.cs | 26 +- .../Http2/Http2StreamTests.cs | 6 +- .../Http2/Http2TimeoutTests.cs | 1 - 16 files changed, 773 insertions(+), 721 deletions(-) delete mode 100644 src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs delete mode 100644 src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs delete mode 100644 src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs deleted file mode 100644 index 4ef7703b174b..000000000000 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/AwaitableProvider.cs +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Diagnostics; -using System.Threading.Tasks.Sources; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; - -internal abstract class AwaitableProvider -{ - public abstract ManualResetValueTaskSource GetAwaitable(); - public abstract void CompleteCurrent(); - public abstract int ActiveCount { get; } -} - -/// -/// Provider returns multiple awaitables. Awaitables are completed FIFO. -/// -internal class MultipleAwaitableProvider : AwaitableProvider -{ - private Queue>? _awaitableQueue; - private Queue>? _awaitableCache; - - public override void CompleteCurrent() - { - Debug.Assert(_awaitableQueue != null); - Debug.Assert(_awaitableCache != null); - - var awaitable = _awaitableQueue.Dequeue(); - awaitable.TrySetResult(null); - - // Add completed awaitable to the cache for reuse - _awaitableCache.Enqueue(awaitable); - } - - public override ManualResetValueTaskSource GetAwaitable() - { - if (_awaitableQueue == null) - { - _awaitableQueue = new Queue>(); - _awaitableCache = new Queue>(); - } - - // First attempt to reuse an existing awaitable in the queue - // to save allocating a new instance. - if (_awaitableCache!.TryDequeue(out var awaitable)) - { - // Reset previously used awaitable - Debug.Assert(awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed."); - awaitable.Reset(); - } - else - { - awaitable = new ManualResetValueTaskSource(); - } - - _awaitableQueue.Enqueue(awaitable); - - return awaitable; - } - - public override int ActiveCount => _awaitableQueue?.Count ?? 0; -} - -/// -/// Provider has a single awaitable. -/// -internal class SingleAwaitableProvider : AwaitableProvider -{ - private ManualResetValueTaskSource? _awaitable; - - public override void CompleteCurrent() - { - Debug.Assert(_awaitable != null); - _awaitable.TrySetResult(null); - } - - public override ManualResetValueTaskSource GetAwaitable() - { - if (_awaitable == null) - { - _awaitable = new ManualResetValueTaskSource(); - } - else - { - Debug.Assert(_awaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Previous awaitable should have been completed."); - _awaitable.Reset(); - } - - return _awaitable; - } - - public override int ActiveCount => _awaitable != null && _awaitable.GetStatus() != ValueTaskSourceStatus.Succeeded ? 1 : 0; -} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs deleted file mode 100644 index 7570dead8843..000000000000 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/OutputFlowControl.cs +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Diagnostics; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; - -internal class OutputFlowControl -{ - private FlowControl _flow; - private readonly AwaitableProvider _awaitableProvider; - - public OutputFlowControl(AwaitableProvider awaitableProvider, uint initialWindowSize) - { - _flow = new FlowControl(initialWindowSize); - _awaitableProvider = awaitableProvider; - } - - public int Available => _flow.Available; - public bool IsAborted => _flow.IsAborted; - - public ManualResetValueTaskSource AvailabilityAwaitable - { - get - { - Debug.Assert(!_flow.IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort."); - Debug.Assert(_flow.Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available."); - - return _awaitableProvider.GetAwaitable(); - } - } - - public void Reset(uint initialWindowSize) - { - // When output flow control is reused the client window size needs to be reset. - // The client might have changed the window size before the stream is reused. - _flow = new FlowControl(initialWindowSize); - Debug.Assert(_awaitableProvider.ActiveCount == 0, "Queue should have been emptied by the previous stream."); - } - - public void Advance(int bytes) - { - _flow.Advance(bytes); - } - - // bytes can be negative when SETTINGS_INITIAL_WINDOW_SIZE decreases mid-connection. - // This can also cause Available to become negative which MUST be allowed. - // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2 - public bool TryUpdateWindow(int bytes) - { - if (_flow.TryUpdateWindow(bytes)) - { - while (_flow.Available > 0 && _awaitableProvider.ActiveCount > 0) - { - _awaitableProvider.CompleteCurrent(); - } - - return true; - } - - return false; - } - - public void Abort() - { - // Make sure to set the aborted flag before running any continuations. - _flow.Abort(); - - while (_awaitableProvider.ActiveCount > 0) - { - _awaitableProvider.CompleteCurrent(); - } - } -} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs deleted file mode 100644 index 100a0cb34548..000000000000 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/FlowControl/StreamOutputFlowControl.cs +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.Diagnostics; -using System.Threading.Tasks.Sources; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; - -internal class StreamOutputFlowControl -{ - private readonly OutputFlowControl _connectionLevelFlowControl; - private readonly OutputFlowControl _streamLevelFlowControl; - - private ManualResetValueTaskSource? _currentConnectionLevelAwaitable; - private int _currentConnectionLevelAwaitableVersion; - - public StreamOutputFlowControl(OutputFlowControl connectionLevelFlowControl, uint initialWindowSize) - { - _connectionLevelFlowControl = connectionLevelFlowControl; - _streamLevelFlowControl = new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize); - } - - public int Available => Math.Min(_connectionLevelFlowControl.Available, _streamLevelFlowControl.Available); - - public bool IsAborted => _connectionLevelFlowControl.IsAborted || _streamLevelFlowControl.IsAborted; - - public void Reset(uint initialWindowSize) - { - _streamLevelFlowControl.Reset(initialWindowSize); - if (_currentConnectionLevelAwaitable != null) - { - Debug.Assert(_currentConnectionLevelAwaitable.GetStatus() == ValueTaskSourceStatus.Succeeded, "Should have been completed by the previous stream."); - _currentConnectionLevelAwaitable = null; - _currentConnectionLevelAwaitableVersion = -1; - } - } - - public void Advance(int bytes) - { - _connectionLevelFlowControl.Advance(bytes); - _streamLevelFlowControl.Advance(bytes); - } - - public int AdvanceUpToAndWait(long bytes, out ValueTask availabilityTask) - { - var leastAvailableFlow = _connectionLevelFlowControl.Available < _streamLevelFlowControl.Available - ? _connectionLevelFlowControl : _streamLevelFlowControl; - - // This cast is safe because leastAvailableFlow.Available is an int. - var actual = (int)Math.Clamp(leastAvailableFlow.Available, 0, bytes); - - // Make sure to advance prior to accessing AvailabilityAwaitable. - _connectionLevelFlowControl.Advance(actual); - _streamLevelFlowControl.Advance(actual); - - availabilityTask = default; - _currentConnectionLevelAwaitable = null; - _currentConnectionLevelAwaitableVersion = -1; - - if (actual < bytes) - { - var awaitable = leastAvailableFlow.AvailabilityAwaitable; - - if (leastAvailableFlow == _connectionLevelFlowControl) - { - _currentConnectionLevelAwaitable = awaitable; - _currentConnectionLevelAwaitableVersion = awaitable.Version; - } - - availabilityTask = new ValueTask(awaitable, awaitable.Version); - } - - return actual; - } - - // The connection-level update window is updated independently. - // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 - public bool TryUpdateWindow(int bytes) - { - return _streamLevelFlowControl.TryUpdateWindow(bytes); - } - - public void Abort() - { - _streamLevelFlowControl.Abort(); - - // If this stream is waiting on a connection-level window update, complete this stream's - // connection-level awaitable so the stream abort is observed immediately. - // This could complete an awaitable still sitting in the connection-level awaitable queue, - // but this is safe because completing it again will just no-op. - if (_currentConnectionLevelAwaitable != null && - _currentConnectionLevelAwaitable.Version == _currentConnectionLevelAwaitableVersion) - { - _currentConnectionLevelAwaitable.TrySetResult(null); - } - } -} diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs index 5f05596a101e..68eaf98be9b8 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs @@ -36,13 +36,10 @@ internal partial class Http2Connection : IHttp2StreamLifetimeHandler, IHttpStrea private readonly HttpConnectionContext _context; private readonly Http2FrameWriter _frameWriter; private readonly Pipe _input; - private readonly Pipe _output; private readonly Task _inputTask; - private readonly Task _outputTask; private readonly int _minAllocBufferSize; private readonly HPackDecoder _hpackDecoder; private readonly InputFlowControl _inputFlowControl; - private readonly OutputFlowControl _outputFlowControl = new OutputFlowControl(new MultipleAwaitableProvider(), Http2PeerSettings.DefaultInitialWindowSize); private readonly Http2PeerSettings _serverSettings = new Http2PeerSettings(); private readonly Http2PeerSettings _clientSettings = new Http2PeerSettings(); @@ -74,6 +71,7 @@ internal partial class Http2Connection : IHttp2StreamLifetimeHandler, IHttpStrea internal readonly Http2KeepAlive? _keepAlive; internal readonly Dictionary _streams = new Dictionary(); internal PooledStreamStack StreamPool; + internal Action? _onStreamCompleted; public Http2Connection(HttpConnectionContext context) { @@ -86,18 +84,6 @@ public Http2Connection(HttpConnectionContext context) _context.InitialExecutionContext = ExecutionContext.Capture(); _input = new Pipe(GetInputPipeOptions()); - _output = new Pipe(GetOutputPipeOptions()); - - _frameWriter = new Http2FrameWriter( - _output.Writer, - context.ConnectionContext, - this, - _outputFlowControl, - context.TimeoutControl, - httpLimits.MinResponseDataRate, - context.ConnectionId, - context.MemoryPool, - context.ServiceContext); _minAllocBufferSize = context.MemoryPool.GetMinimumAllocSize(); @@ -126,7 +112,17 @@ public Http2Connection(HttpConnectionContext context) _scheduleInline = context.ServiceContext.Scheduler == PipeScheduler.Inline; _inputTask = CopyPipeAsync(_context.Transport.Input, _input.Writer); - _outputTask = CopyPipeAsync(_output.Reader, _context.Transport.Output); + + _frameWriter = new Http2FrameWriter( + context.Transport.Output, + context.ConnectionContext, + this, + (int)Math.Min(MaxTrackedStreams, int.MaxValue), + context.TimeoutControl, + httpLimits.MinResponseDataRate, + context.ConnectionId, + context.MemoryPool, + context.ServiceContext); } public string ConnectionId => _context.ConnectionId; @@ -378,10 +374,9 @@ public async Task ProcessRequestsAsync(IHttpApplication appl finally { Input.Complete(); - _output.Writer.Complete(); _context.Transport.Input.CancelPendingRead(); await _inputTask; - await _outputTask; + await _frameWriter.ShutdownAsync(); } } } @@ -762,8 +757,7 @@ private Http2StreamContext CreateHttp2StreamContext() _clientSettings, _serverSettings, _frameWriter, - _inputFlowControl, - _outputFlowControl); + _inputFlowControl); streamContext.TimeoutControl = _context.TimeoutControl; streamContext.InitialExecutionContext = _context.InitialExecutionContext; @@ -1236,6 +1230,7 @@ void IHttp2StreamLifetimeHandler.OnStreamCompleted(Http2Stream stream) { _completedStreams.Enqueue(stream); _streamCompletionAwaitable.Complete(); + _onStreamCompleted?.Invoke(stream); } private void UpdateCompletedStreams() @@ -1662,38 +1657,6 @@ public void DecrementActiveClientStreamCount() minimumSegmentSize: _context.MemoryPool.GetMinimumSegmentSize(), useSynchronizationContext: false); - private PipeOptions GetOutputPipeOptions() - { - // Never write inline because we do not want to hold Http2FramerWriter._writeLock for potentially expensive TLS - // write operations. This essentially doubles the MaxResponseBufferSize for HTTP/2 connections compared to - // HTTP/1.x. This seems reasonable given HTTP/2's support for many concurrent streams per connection. We don't - // want every write to return an incomplete ValueTask now that we're dispatching TLS write operations which - // would likely happen with a pauseWriterThreshold of 1, but we still need to respect connection back pressure. - var pauseWriterThreshold = _context.ServiceContext.ServerOptions.Limits.MaxResponseBufferSize switch - { - // null means that we have no back pressure - null => 0, - // 0 = no buffering so we need to configure the pipe so the writer waits on the reader directly - 0 => 1, - long limit => limit, - }; - - var resumeWriterThreshold = pauseWriterThreshold switch - { - // The resumeWriterThreshold must be at least 1 to ever resume after pausing. - 1 => 1, - long limit => limit / 2, - }; - - return new PipeOptions(pool: _context.MemoryPool, - readerScheduler: _context.ServiceContext.Scheduler, - writerScheduler: PipeScheduler.Inline, - pauseWriterThreshold: pauseWriterThreshold, - resumeWriterThreshold: resumeWriterThreshold, - minimumSegmentSize: _context.MemoryPool.GetMinimumSegmentSize(), - useSynchronizationContext: false); - } - private async Task CopyPipeAsync(PipeReader reader, PipeWriter writer) { Exception? error = null; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index cec7e6d53566..d7a86bd31423 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -6,9 +6,9 @@ using System.Diagnostics; using System.IO.Pipelines; using System.Net.Http.HPack; +using System.Threading.Channels; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; @@ -26,13 +26,13 @@ internal class Http2FrameWriter private readonly ConcurrentPipeWriter _outputWriter; private readonly BaseConnectionContext _connectionContext; private readonly Http2Connection _http2Connection; - private readonly OutputFlowControl _connectionOutputFlowControl; private readonly string _connectionId; private readonly KestrelTrace _log; private readonly ITimeoutControl _timeoutControl; private readonly MinDataRate? _minResponseDataRate; private readonly TimingPipeFlusher _flusher; private readonly DynamicHPackEncoder _hpackEncoder; + private readonly Channel _channel; // This is only set to true by tests. private readonly bool _scheduleInline; @@ -44,11 +44,18 @@ internal class Http2FrameWriter private bool _completed; private bool _aborted; + private readonly object _windowUpdateLock = new(); + private long _connectionWindow; + private readonly Queue _waitingForMoreConnectionWindow = new(); + // This is the stream that consumed the last set of connection window + private Http2OutputProducer? _lastWindowConsumer; + private readonly Task _writeQueueTask; + public Http2FrameWriter( PipeWriter outputPipeWriter, BaseConnectionContext connectionContext, Http2Connection http2Connection, - OutputFlowControl connectionOutputFlowControl, + int maxStreamsPerConnection, ITimeoutControl timeoutControl, MinDataRate? minResponseDataRate, string connectionId, @@ -59,7 +66,6 @@ public Http2FrameWriter( _outputWriter = new ConcurrentPipeWriter(outputPipeWriter, memoryPool, _writeLock); _connectionContext = connectionContext; _http2Connection = http2Connection; - _connectionOutputFlowControl = connectionOutputFlowControl; _connectionId = connectionId; _log = serviceContext.Log; _timeoutControl = timeoutControl; @@ -72,6 +78,218 @@ public Http2FrameWriter( _scheduleInline = serviceContext.Scheduler == PipeScheduler.Inline; _hpackEncoder = new DynamicHPackEncoder(serviceContext.ServerOptions.AllowResponseHeaderCompression); + + // This is bounded by the maximum number of concurrent Http2Streams per Http2Connection. + // This isn't the same as SETTINGS_MAX_CONCURRENT_STREAMS, but typically double (with a floor of 100) + // which is the max number of Http2Streams that can end up in the Http2Connection._streams dictionary. + // + // Setting a lower limit of SETTINGS_MAX_CONCURRENT_STREAMS might be sufficient because a stream shouldn't + // be rescheduling itself after being completed or canceled, but we're going with the more conservative limit + // in case there's some logic scheduling completed or canceled streams unnecessarily. + _channel = Channel.CreateBounded(new BoundedChannelOptions(maxStreamsPerConnection) + { + AllowSynchronousContinuations = _scheduleInline, + SingleReader = true + }); + + _connectionWindow = Http2PeerSettings.DefaultInitialWindowSize; + + _writeQueueTask = Task.Run(WriteToOutputPipe); + } + + public void Schedule(Http2OutputProducer producer) + { + if (!_channel.Writer.TryWrite(producer)) + { + // It should not be possible to exceed the bound of the channel. + var ex = new ConnectionAbortedException("HTTP/2 connection exceeded the output operations maximum queue size."); + _log.Http2QueueOperationsExceeded(_connectionId, ex); + _http2Connection.Abort(ex); + } + } + + private async Task WriteToOutputPipe() + { + while (await _channel.Reader.WaitToReadAsync()) + { + // We need to handle the case where aborts can be scheduled while this loop is running and might be on the way to complete + // the reader. + while (_channel.Reader.TryRead(out var producer) && !producer.CompletedResponse) + { + try + { + var reader = producer.PipeReader; + var stream = producer.Stream; + + // We don't need to check the result because it's either + // - true because we have a result + // - false because we're flushing headers + reader.TryRead(out var readResult); + var buffer = readResult.Buffer; + + // Check the stream window + var (actual, remainingStream) = producer.ConsumeStreamWindow(buffer.Length); + + // Now check the connection window + (actual, var remainingConnection) = ConsumeConnectionWindow(actual); + + // Write what we can + if (actual < buffer.Length) + { + buffer = buffer.Slice(0, actual); + } + + // Stash the unobserved state, we're going to mark this snapshot as observed + var observed = producer.UnobservedState; + var currentState = producer.CurrentState; + + // Avoid boxing the enum (though the JIT optimizes this eventually) + static bool HasStateFlag(Http2OutputProducer.State state, Http2OutputProducer.State flags) + => (state & flags) == flags; + + // Check if we need to write headers + var flushHeaders = HasStateFlag(observed, Http2OutputProducer.State.FlushHeaders) && !HasStateFlag(currentState, Http2OutputProducer.State.FlushHeaders); + + (var hasMoreData, var reschedule, currentState, var waitingForWindowUpdates) = producer.ObserveDataAndState(buffer.Length, observed); + + var aborted = HasStateFlag(currentState, Http2OutputProducer.State.Aborted); + var completed = HasStateFlag(currentState, Http2OutputProducer.State.Completed) && !hasMoreData; + + FlushResult flushResult = default; + + // There are 2 cases where we abort: + // 1. We're not complete but we got the abort. + // 2. We're complete and there's no more response data to be written. + if ((aborted && !completed) || (aborted && completed && actual == 0 && stream.ResponseTrailers is null or { Count: 0 })) + { + // Response body is aborted, complete reader for this output producer. + if (flushHeaders) + { + // write headers + WriteResponseHeaders(stream.StreamId, stream.StatusCode, Http2HeadersFrameFlags.NONE, (HttpResponseHeaders)stream.ResponseHeaders); + } + + if (actual > 0) + { + // If we got here it means we're going to cancel the write. Restore any consumed bytes to the connection window. + lock (_windowUpdateLock) + { + _connectionWindow += actual; + } + } + } + else if (completed && stream.ResponseTrailers is { Count: > 0 }) + { + // Output is ending and there are trailers to write + // Write any remaining content then write trailers and there's no + // flow control back pressure being applied (hasMoreData) + + stream.ResponseTrailers.SetReadOnly(); + stream.DecrementActiveClientStreamCount(); + + // It is faster to write data and trailers together. Locking once reduces lock contention. + flushResult = await WriteDataAndTrailersAsync(stream, buffer, flushHeaders, stream.ResponseTrailers); + } + else if (completed && producer.AppCompletedWithNoResponseBodyOrTrailers) + { + if (buffer.Length != 0) + { + _log.Http2UnexpectedDataRemaining(stream.StreamId, _connectionId); + } + else + { + stream.DecrementActiveClientStreamCount(); + + // Headers have already been written and there is no other content to write + flushResult = await FlushAsync(stream, flushHeaders, outputAborter: null, cancellationToken: default); + } + } + else + { + var endStream = completed; + + if (endStream) + { + stream.DecrementActiveClientStreamCount(); + } + + flushResult = await WriteDataAsync(stream, buffer, buffer.Length, endStream, flushHeaders); + } + + if (producer.IsTimingWrite) + { + _timeoutControl.StopTimingWrite(); + } + + reader.AdvanceTo(buffer.End); + + if (completed || aborted) + { + await reader.CompleteAsync(); + + await producer.CompleteResponseAsync(); + } + // We're not going to schedule this again if there's no remaining window. + // When the window update is sent, the producer will be re-queued if needed. + else if (hasMoreData && !aborted && !waitingForWindowUpdates) + { + // We have no more connection window, put this producer in a queue waiting for it to + // a window update to resume the connection. + if (remainingConnection == 0) + { + lock (_windowUpdateLock) + { + // In order to make scheduling more fair we want to make sure that streams that have data get a chance to run in a round robin manner. + // To do this we will store the producer that consumed the window in a field and put it to the back of the queue. + if (actual != 0 && _lastWindowConsumer is null) + { + _lastWindowConsumer = producer; + } + else + { + _waitingForMoreConnectionWindow.Enqueue(producer); + } + } + + producer.SetWaitingForWindowUpdates(); + + // Include waiting for window updates in timing writes + if (_minResponseDataRate != null) + { + producer.IsTimingWrite = true; + _timeoutControl.StartTimingWrite(); + } + } + else if (remainingStream > 0) + { + // Move this stream to the back of the queue so we're being fair to the other streams that have data + producer.Schedule(); + } + else + { + producer.SetWaitingForWindowUpdates(); + + // Include waiting for window updates in timing writes + if (_minResponseDataRate != null) + { + producer.IsTimingWrite = true; + _timeoutControl.StartTimingWrite(); + } + } + } + else if (reschedule) + { + producer.Schedule(); + } + } + catch (Exception ex) + { + _log.Http2UnexpectedConnectionQueueError(_connectionId, ex); + } + } + } + + _log.Http2ConnectionQueueProcessingCompleted(_connectionId); } public void UpdateMaxHeaderTableSize(uint maxHeaderTableSize) @@ -104,11 +322,18 @@ public void Complete() } _completed = true; - _connectionOutputFlowControl.Abort(); + AbortConnectionFlowControl(); _outputWriter.Abort(); } } + public Task ShutdownAsync() + { + _channel.Writer.TryComplete(); + + return _writeQueueTask; + } + public void Abort(ConnectionAbortedException error) { lock (_writeLock) @@ -125,7 +350,7 @@ public void Abort(ConnectionAbortedException error) } } - public ValueTask FlushAsync(IHttpOutputAborter? outputAborter, CancellationToken cancellationToken) + private ValueTask FlushAsync(Http2Stream stream, bool writeHeaders, IHttpOutputAborter? outputAborter, CancellationToken cancellationToken) { lock (_writeLock) { @@ -134,6 +359,12 @@ public ValueTask FlushAsync(IHttpOutputAborter? outputAborter, Canc return default; } + if (writeHeaders) + { + // write headers + WriteResponseHeadersUnsynchronized(stream.StreamId, stream.StatusCode, Http2HeadersFrameFlags.END_STREAM, (HttpResponseHeaders)stream.ResponseHeaders); + } + var bytesWritten = _unflushedBytes; _unflushedBytes = 0; @@ -172,19 +403,8 @@ public ValueTask Write100ContinueAsync(int streamId) | Padding (*) ... +---------------------------------------------------------------+ */ - public void WriteResponseHeaders(Http2Stream stream, int statusCode, bool endStream, HttpResponseHeaders headers) + public void WriteResponseHeaders(int streamId, int statusCode, Http2HeadersFrameFlags headerFrameFlags, HttpResponseHeaders headers) { - Http2HeadersFrameFlags headerFrameFlags; - if (endStream) - { - headerFrameFlags = Http2HeadersFrameFlags.END_STREAM; - stream.DecrementActiveClientStreamCount(); - } - else - { - headerFrameFlags = Http2HeadersFrameFlags.NONE; - } - lock (_writeLock) { if (_completed) @@ -192,28 +412,33 @@ public void WriteResponseHeaders(Http2Stream stream, int statusCode, bool endStr return; } - try - { - _headersEnumerator.Initialize(headers); - _outgoingFrame.PrepareHeaders(headerFrameFlags, stream.StreamId); - var buffer = _headerEncodingBuffer.AsSpan(); - var done = HPackHeaderWriter.BeginEncodeHeaders(statusCode, _hpackEncoder, _headersEnumerator, buffer, out var payloadLength); - FinishWritingHeaders(stream.StreamId, payloadLength, done); - } - // Any exception from the HPack encoder can leave the dynamic table in a corrupt state. - // Since we allow custom header encoders we don't know what type of exceptions to expect. - catch (Exception ex) - { - _log.HPackEncodingError(_connectionId, stream.StreamId, ex); - _http2Connection.Abort(new ConnectionAbortedException(ex.Message, ex)); - throw new InvalidOperationException(ex.Message, ex); // Report the error to the user if this was the first write. - } + WriteResponseHeadersUnsynchronized(streamId, statusCode, headerFrameFlags, headers); } } - public ValueTask WriteResponseTrailersAsync(Http2Stream stream, HttpResponseTrailers headers) + private void WriteResponseHeadersUnsynchronized(int streamId, int statusCode, Http2HeadersFrameFlags headerFrameFlags, HttpResponseHeaders headers) { - stream.DecrementActiveClientStreamCount(); + try + { + _headersEnumerator.Initialize(headers); + _outgoingFrame.PrepareHeaders(headerFrameFlags, streamId); + var buffer = _headerEncodingBuffer.AsSpan(); + var done = HPackHeaderWriter.BeginEncodeHeaders(statusCode, _hpackEncoder, _headersEnumerator, buffer, out var payloadLength); + FinishWritingHeaders(streamId, payloadLength, done); + } + // Any exception from the HPack encoder can leave the dynamic table in a corrupt state. + // Since we allow custom header encoders we don't know what type of exceptions to expect. + catch (Exception ex) + { + _log.HPackEncodingError(_connectionId, streamId, ex); + _http2Connection.Abort(new ConnectionAbortedException(ex.Message, ex)); + } + } + + private ValueTask WriteDataAndTrailersAsync(Http2Stream stream, in ReadOnlySequence data, bool writeHeaders, HttpResponseTrailers headers) + { + // The Length property of a ReadOnlySequence can be expensive, so we cache the value. + var dataLength = data.Length; lock (_writeLock) { @@ -222,19 +447,31 @@ public ValueTask WriteResponseTrailersAsync(Http2Stream stream, Htt return default; } + var streamId = stream.StreamId; + + if (writeHeaders) + { + WriteResponseHeadersUnsynchronized(streamId, stream.StatusCode, Http2HeadersFrameFlags.NONE, (HttpResponseHeaders)stream.ResponseHeaders); + } + + if (dataLength > 0) + { + WriteDataUnsynchronized(streamId, data, dataLength, endStream: false); + } + try { _headersEnumerator.Initialize(headers); - _outgoingFrame.PrepareHeaders(Http2HeadersFrameFlags.END_STREAM, stream.StreamId); + _outgoingFrame.PrepareHeaders(Http2HeadersFrameFlags.END_STREAM, streamId); var buffer = _headerEncodingBuffer.AsSpan(); var done = HPackHeaderWriter.BeginEncodeHeaders(_hpackEncoder, _headersEnumerator, buffer, out var payloadLength); - FinishWritingHeaders(stream.StreamId, payloadLength, done); + FinishWritingHeaders(streamId, payloadLength, done); } // Any exception from the HPack encoder can leave the dynamic table in a corrupt state. // Since we allow custom header encoders we don't know what type of exceptions to expect. catch (Exception ex) { - _log.HPackEncodingError(_connectionId, stream.StreamId, ex); + _log.HPackEncodingError(_connectionId, streamId, ex); _http2Connection.Abort(new ConnectionAbortedException(ex.Message, ex)); } @@ -271,78 +508,6 @@ private void FinishWritingHeaders(int streamId, int payloadLength, bool done) } } - public ValueTask WriteDataAsync(Http2Stream stream, StreamOutputFlowControl flowControl, in ReadOnlySequence data, bool endStream, bool firstWrite, bool forceFlush) - { - // Logic in this method is replicated in WriteDataAndTrailersAsync. - // Changes here may need to be mirrored in WriteDataAndTrailersAsync. - - // The Length property of a ReadOnlySequence can be expensive, so we cache the value. - var dataLength = data.Length; - - lock (_writeLock) - { - if (_completed || flowControl.IsAborted) - { - return default; - } - - // Zero-length data frames are allowed to be sent immediately even if there is no space available in the flow control window. - // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 - if (dataLength != 0 && dataLength > flowControl.Available) - { - return WriteDataAsync(stream, flowControl, data, dataLength, endStream, firstWrite); - } - - // This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window. - flowControl.Advance((int)dataLength); - WriteDataUnsynchronized(stream, data, dataLength, endStream); - - if (forceFlush) - { - return TimeFlushUnsynchronizedAsync(); - } - - return default; - } - } - - public ValueTask WriteDataAndTrailersAsync(Http2Stream stream, StreamOutputFlowControl flowControl, in ReadOnlySequence data, bool firstWrite, HttpResponseTrailers headers) - { - // This method combines WriteDataAsync and WriteResponseTrailers. - // Changes here may need to be mirrored in WriteDataAsync. - - // The Length property of a ReadOnlySequence can be expensive, so we cache the value. - var dataLength = data.Length; - - lock (_writeLock) - { - if (_completed || flowControl.IsAborted) - { - return default; - } - - // Zero-length data frames are allowed to be sent immediately even if there is no space available in the flow control window. - // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 - if (dataLength != 0 && dataLength > flowControl.Available) - { - return WriteDataAndTrailersAsyncCore(this, stream, flowControl, data, dataLength, firstWrite, headers); - } - - // This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window. - flowControl.Advance((int)dataLength); - WriteDataUnsynchronized(stream, data, dataLength, endStream: false); - - return WriteResponseTrailersAsync(stream, headers); - } - - static async ValueTask WriteDataAndTrailersAsyncCore(Http2FrameWriter writer, Http2Stream stream, StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool firstWrite, HttpResponseTrailers headers) - { - await writer.WriteDataAsync(stream, flowControl, data, dataLength, endStream: false, firstWrite); - - return await writer.WriteResponseTrailersAsync(stream, headers); - } - } - /* Padding is not implemented +---------------+ |Pad Length? (8)| @@ -352,12 +517,12 @@ static async ValueTask WriteDataAndTrailersAsyncCore(Http2FrameWrit | Padding (*) ... +---------------------------------------------------------------+ */ - private void WriteDataUnsynchronized(Http2Stream stream, in ReadOnlySequence data, long dataLength, bool endStream) + private void WriteDataUnsynchronized(int streamId, in ReadOnlySequence data, long dataLength, bool endStream) { Debug.Assert(dataLength == data.Length); // Note padding is not implemented - _outgoingFrame.PrepareData(stream.StreamId); + _outgoingFrame.PrepareData(streamId); if (dataLength > _maxFrameSize) // Minus padding { @@ -365,7 +530,16 @@ private void WriteDataUnsynchronized(Http2Stream stream, in ReadOnlySequence data, long dataLen do { var currentData = remainingData.Slice(0, dataPayloadLength); + _outgoingFrame.PayloadLength = dataPayloadLength; // Plus padding - WriteDataUnsynchronizedCore(stream, endStream: false, dataPayloadLength, currentData); + WriteHeaderUnsynchronized(); + + currentData.CopyTo(_outputWriter); // Plus padding dataLength -= dataPayloadLength; @@ -391,132 +568,85 @@ void TrimAndWriteDataUnsynchronized(in ReadOnlySequence data, long dataLen } while (dataLength > dataPayloadLength); - WriteDataUnsynchronizedCore(stream, endStream, dataLength, remainingData); - - // Plus padding - } - - void WriteDataUnsynchronizedCore(Http2Stream stream, bool endStream, long dataLength, in ReadOnlySequence data) - { - Debug.Assert(dataLength == data.Length); - if (endStream) { _outgoingFrame.DataFlags |= Http2DataFrameFlags.END_STREAM; - - // When writing data, must decrement active stream count after flow control availability is checked. - // If active stream count becomes zero while a graceful shutdown is in progress then the input side of connection is closed. - // This is a problem if a large amount of data is being written. The server must keep processing incoming WINDOW_UPDATE frames. - // No WINDOW_UPDATE frames means response write could hit flow control and hang. - // Decrement also has to happen before writing END_STREAM to client to avoid race over active stream count. - stream.DecrementActiveClientStreamCount(); } - // It can be expensive to get length from ROS. Use already available value. _outgoingFrame.PayloadLength = (int)dataLength; // Plus padding WriteHeaderUnsynchronized(); - data.CopyTo(_outputWriter); + remainingData.CopyTo(_outputWriter); + + // Plus padding } } - private async ValueTask WriteDataAsync(Http2Stream stream, StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool endStream, bool firstWrite) + private ValueTask WriteDataAsync(Http2Stream stream, ReadOnlySequence data, long dataLength, bool endStream, bool writeHeaders) { - FlushResult flushResult = default; + var writeTask = default(ValueTask); - while (dataLength > 0) + lock (_writeLock) { - ValueTask availabilityTask; - var writeTask = default(ValueTask); - - lock (_writeLock) + if (_completed) { - if (_completed || flowControl.IsAborted) - { - break; - } - - // Observe HTTP/2 backpressure - var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityTask); - - var shouldFlush = false; - - if (actual > 0) - { - if (actual < dataLength) - { - WriteDataUnsynchronized(stream, data.Slice(0, actual), actual, endStream: false); - data = data.Slice(actual); - dataLength -= actual; - } - else - { - WriteDataUnsynchronized(stream, data, actual, endStream); - dataLength = 0; - } - - // Don't call FlushAsync() with the min data rate, since we time this write while also accounting for - // flow control induced backpressure below. - shouldFlush = true; - } - else if (firstWrite) - { - // If we're facing flow control induced backpressure on the first write for a given stream's response body, - // we make sure to flush the response headers immediately. - shouldFlush = true; - } - - if (shouldFlush) - { - if (_minResponseDataRate != null) - { - // Call BytesWrittenToBuffer before FlushAsync() to make testing easier, otherwise the Flush can cause test code to run before the timeout - // control updates and if the test checks for a timeout it can fail - _timeoutControl.BytesWrittenToBuffer(_minResponseDataRate, _unflushedBytes); - } + return ValueTask.FromResult(default); + } - _unflushedBytes = 0; + var shouldFlush = false; - writeTask = _flusher.FlushAsync(); - } + if (writeHeaders) + { + WriteResponseHeadersUnsynchronized(stream.StreamId, stream.StatusCode, Http2HeadersFrameFlags.NONE, (HttpResponseHeaders)stream.ResponseHeaders); - firstWrite = false; + shouldFlush = true; } - // Avoid timing writes that are already complete. This is likely to happen during the last iteration. - if (availabilityTask.IsCompleted && writeTask.IsCompleted) + if (dataLength > 0 || endStream) { - continue; + WriteDataUnsynchronized(stream.StreamId, data, dataLength, endStream); + + shouldFlush = true; } if (_minResponseDataRate != null) { - _timeoutControl.StartTimingWrite(); + // Call BytesWrittenToBuffer before FlushAsync() to make testing easier, otherwise the Flush can cause test code to run before the timeout + // control updates and if the test checks for a timeout it can fail + _timeoutControl.BytesWrittenToBuffer(_minResponseDataRate, _unflushedBytes); } - // This awaitable releases continuations in FIFO order when the window updates. - // It should be very rare for a continuation to run without any availability. - if (!availabilityTask.IsCompleted) + if (shouldFlush) { - await availabilityTask; - } + _unflushedBytes = 0; - flushResult = await writeTask; - - if (_minResponseDataRate != null) - { - _timeoutControl.StopTimingWrite(); + writeTask = _flusher.FlushAsync(); } } - if (!_scheduleInline) + if (writeTask.IsCompletedSuccessfully) { - // Ensure that the application continuation isn't executed inline by ProcessWindowUpdateFrameAsync. - await ThreadPoolAwaitable.Instance; + return new(writeTask.Result); } - return flushResult; + return FlushAsyncAwaited(writeTask, _timeoutControl, _minResponseDataRate); + + static async ValueTask FlushAsyncAwaited(ValueTask writeTask, ITimeoutControl timeoutControl, MinDataRate? minResponseDataRate) + { + if (minResponseDataRate != null) + { + timeoutControl.StartTimingWrite(); + } + + var flushResult = await writeTask; + + if (minResponseDataRate != null) + { + timeoutControl.StopTimingWrite(); + } + return flushResult; + } } /* https://tools.ietf.org/html/rfc7540#section-6.9 @@ -725,27 +855,62 @@ private ValueTask TimeFlushUnsynchronizedAsync() return _flusher.FlushAsync(_minResponseDataRate, bytesWritten); } - public bool TryUpdateConnectionWindow(int bytes) + private (long, long) ConsumeConnectionWindow(long bytes) { - lock (_writeLock) + lock (_windowUpdateLock) { - return _connectionOutputFlowControl.TryUpdateWindow(bytes); + var actual = Math.Min(bytes, _connectionWindow); + _connectionWindow -= actual; + return (actual, _connectionWindow); } } - public bool TryUpdateStreamWindow(StreamOutputFlowControl flowControl, int bytes) + private void AbortConnectionFlowControl() { - lock (_writeLock) + lock (_windowUpdateLock) { - return flowControl.TryUpdateWindow(bytes); + if (_lastWindowConsumer is { } producer) + { + _lastWindowConsumer = null; + + // Put the consumer of the connection window last + _waitingForMoreConnectionWindow.Enqueue(producer); + } + + while (_waitingForMoreConnectionWindow.TryDequeue(out producer)) + { + // Abort the stream + producer.Stop(); + } } } - public void AbortPendingStreamDataWrites(StreamOutputFlowControl flowControl) + public bool TryUpdateConnectionWindow(int bytes) { - lock (_writeLock) + lock (_windowUpdateLock) { - flowControl.Abort(); + var maxUpdate = Http2PeerSettings.MaxWindowSize - _connectionWindow; + + if (bytes > maxUpdate) + { + return false; + } + + _connectionWindow += bytes; + + if (_lastWindowConsumer is { } producer) + { + _lastWindowConsumer = null; + + // Put the consumer of the connection window last + _waitingForMoreConnectionWindow.Enqueue(producer); + } + + while (_waitingForMoreConnectionWindow.TryDequeue(out producer)) + { + producer.ScheduleResumeFromWindowUpdate(); + } } + return true; } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs index 0cabbd36bd36..6440b2f16d19 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs @@ -4,66 +4,61 @@ using System.Buffers; using System.Diagnostics; using System.IO.Pipelines; -using System.Threading.Tasks.Sources; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; -using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2; -internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IValueTaskSource, IDisposable +internal class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IDisposable { private int StreamId => _stream.StreamId; private readonly Http2FrameWriter _frameWriter; private readonly TimingPipeFlusher _flusher; private readonly KestrelTrace _log; - // This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the - // FrameWriter's connection-level write lock. - private readonly StreamOutputFlowControl _flowControl; private readonly MemoryPool _memoryPool; private readonly Http2Stream _stream; private readonly object _dataWriterLock = new object(); private readonly Pipe _pipe; private readonly ConcurrentPipeWriter _pipeWriter; private readonly PipeReader _pipeReader; - private readonly ManualResetValueTaskSource _resetAwaitable = new ManualResetValueTaskSource(); private IMemoryOwner? _fakeMemoryOwner; private byte[]? _fakeMemory; private bool _startedWritingDataFrames; - private bool _streamCompleted; + private bool _completeScheduled; private bool _suffixSent; - private bool _streamEnded; + private bool _appCompletedWithNoResponseBodyOrTrailers; private bool _writerComplete; + private bool _isScheduled; // Internal for testing - internal Task _dataWriteProcessingTask; internal bool _disposed; - /// The core logic for the IValueTaskSource implementation. - private ManualResetValueTaskSourceCore _responseCompleteTaskSource = new ManualResetValueTaskSourceCore { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly + private long _unconsumedBytes; + private long _streamWindow; - // This object is itself usable as a backing source for ValueTask. Since there's only ever one awaiter - // for this object's state transitions at a time, we allow the object to be awaited directly. All functionality - // associated with the implementation is just delegated to the ManualResetValueTaskSourceCore. - private ValueTask GetWaiterTask() => new ValueTask(this, _responseCompleteTaskSource.Version); - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _responseCompleteTaskSource.GetStatus(token); - void IValueTaskSource.OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _responseCompleteTaskSource.OnCompleted(continuation, state, token, flags); - FlushResult IValueTaskSource.GetResult(short token) => _responseCompleteTaskSource.GetResult(token); + // For scheduling changes that don't affect the number of bytes written to the pipe, we need another state. + private State _unobservedState; - public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, StreamOutputFlowControl flowControl) + // This reflects the current state of the output, the current state becomes the unobserved state after it has been observed. + private State _currentState; + private bool _completedResponse; + private bool _requestProcessingComplete; + private bool _waitingForWindowUpdates; + private Http2ErrorCode? _resetErrorCode; + + public Http2OutputProducer(Http2Stream stream, Http2StreamContext context) { _stream = stream; _frameWriter = context.FrameWriter; - _flowControl = flowControl; _memoryPool = context.MemoryPool; _log = context.ServiceContext.Log; + var scheduleInline = context.ServiceContext.Scheduler == PipeScheduler.Inline; - _pipe = CreateDataPipe(_memoryPool); + _pipe = CreateDataPipe(_memoryPool, scheduleInline); _pipeWriter = new ConcurrentPipeWriter(_pipe.Writer, _memoryPool, _dataWriterLock); _pipeReader = _pipe.Reader; @@ -72,28 +67,129 @@ public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, Strea // The minimum output data rate is enforced at the connection level by Http2FrameWriter. _flusher = new TimingPipeFlusher(timeoutControl: null, _log); _flusher.Initialize(_pipeWriter); + _streamWindow = context.ClientPeerSettings.InitialWindowSize; + } + + public Http2Stream Stream => _stream; + public PipeReader PipeReader => _pipeReader; + + public bool IsTimingWrite { get; set; } + + public bool AppCompletedWithNoResponseBodyOrTrailers => _appCompletedWithNoResponseBodyOrTrailers; - _dataWriteProcessingTask = ProcessDataWrites(); + public bool CompletedResponse + { + get + { + lock (_dataWriterLock) + { + return _completedResponse; + } + } + } + + // Useful for debugging the scheduling state in the debugger + internal (int, long, State, State, long) SchedulingState => (Stream.StreamId, _unconsumedBytes, _unobservedState, _currentState, _streamWindow); + + public State UnobservedState + { + get + { + lock (_dataWriterLock) + { + return _unobservedState; + } + } + } + + public State CurrentState + { + get + { + lock (_dataWriterLock) + { + return _currentState; + } + } + } + + // Added bytes to the queue. + // Returns a bool that represents whether we should schedule this producer to write + // the enqueued bytes + private void EnqueueDataWrite(long bytes) + { + lock (_dataWriterLock) + { + _unconsumedBytes += bytes; + } + } + + // Determines if we should schedule this producer to observe + // any state changes made. + private void EnqueueStateUpdate(State state) + { + lock (_dataWriterLock) + { + _unobservedState |= state; + } } - public void StreamReset() + public void SetWaitingForWindowUpdates() + { + lock (_dataWriterLock) + { + _waitingForWindowUpdates = true; + } + } + + // Removes consumed bytes from the queue. + // Returns a bool that represents whether we should schedule this producer to write + // the remaining bytes. + internal (bool hasMoreData, bool reschedule, State currentState, bool waitingForWindowUpdates) ObserveDataAndState(long bytes, State state) + { + lock (_dataWriterLock) + { + _isScheduled = false; + _unobservedState &= ~state; + _currentState |= state; + _unconsumedBytes -= bytes; + return (_unconsumedBytes > 0, _unobservedState != State.None, _currentState, _waitingForWindowUpdates); + } + } + + // Consumes bytes from the stream's window and returns the remaining bytes and actual bytes consumed + internal (long actual, long remaining) ConsumeStreamWindow(long bytes) + { + lock (_dataWriterLock) + { + var actual = Math.Min(bytes, _streamWindow); + _streamWindow -= actual; + return (actual, _streamWindow); + } + } + + public void StreamReset(uint initialWindowSize) { - // Data background task must still be running. - Debug.Assert(!_dataWriteProcessingTask.IsCompleted); // Response should have been completed. - Debug.Assert(_responseCompleteTaskSource.GetStatus(_responseCompleteTaskSource.Version) == ValueTaskSourceStatus.Succeeded); + Debug.Assert(_completedResponse); - _streamEnded = false; + _appCompletedWithNoResponseBodyOrTrailers = false; _suffixSent = false; _startedWritingDataFrames = false; - _streamCompleted = false; + _completeScheduled = false; _writerComplete = false; _pipe.Reset(); _pipeWriter.Reset(); - _responseCompleteTaskSource.Reset(); - // Trigger the data process task to resume - _resetAwaitable.SetResult(null); + _streamWindow = initialWindowSize; + _unconsumedBytes = 0; + _unobservedState = State.None; + _currentState = State.None; + _completedResponse = false; + _requestProcessingComplete = false; + _waitingForWindowUpdates = false; + _resetErrorCode = null; + IsTimingWrite = false; } public void Complete() @@ -109,8 +205,20 @@ public void Complete() Stop(); - // Make sure the writing side is completed. - _pipeWriter.Complete(); + if (!_completeScheduled) + { + EnqueueStateUpdate(State.Completed); + + // Make sure the writing side is completed. + _pipeWriter.Complete(); + + Schedule(); + } + else + { + // Make sure the writing side is completed. + _pipeWriter.Complete(); + } if (_fakeMemoryOwner != null) { @@ -135,7 +243,7 @@ void IHttpOutputAborter.Abort(ConnectionAbortedException abortReason) void IHttpOutputAborter.OnInputOrOutputCompleted() { - _stream.ResetAndAbort(new ConnectionAbortedException($"{nameof(Http2OutputProducer)}.{nameof(ProcessDataWrites)} has completed."), Http2ErrorCode.INTERNAL_ERROR); + _stream.ResetAndAbort(new ConnectionAbortedException($"{nameof(Http2OutputProducer)} has completed."), Http2ErrorCode.INTERNAL_ERROR); } public ValueTask FlushAsync(CancellationToken cancellationToken) @@ -148,7 +256,8 @@ public ValueTask FlushAsync(CancellationToken cancellationToken) lock (_dataWriterLock) { ThrowIfSuffixSentOrCompleted(); - if (_streamCompleted) + + if (_completeScheduled) { return new ValueTask(new FlushResult(false, true)); } @@ -157,15 +266,50 @@ public ValueTask FlushAsync(CancellationToken cancellationToken) { // If there's already been response data written to the stream, just wait for that. Any header // should be in front of the data frames in the connection pipe. Trailers could change things. - return _flusher.FlushAsync(this, cancellationToken); + var task = _flusher.FlushAsync(this, cancellationToken); + + Schedule(); + + return task; } else { - // Flushing the connection pipe ensures headers already in the pipe are flushed even if no data - // frames have been written. - return _frameWriter.FlushAsync(this, cancellationToken); + Schedule(); + + return default; + } + } + } + + public void Schedule() + { + lock (_dataWriterLock) + { + // Lock here + if (_isScheduled) + { + return; } + + _isScheduled = true; } + + _frameWriter.Schedule(this); + } + + public void ScheduleResumeFromWindowUpdate() + { + if (_completedResponse) + { + return; + } + + lock (_dataWriterLock) + { + _waitingForWindowUpdates = false; + } + + Schedule(); } public ValueTask Write100ContinueAsync() @@ -174,7 +318,7 @@ public ValueTask Write100ContinueAsync() { ThrowIfSuffixSentOrCompleted(); - if (_streamCompleted) + if (_completeScheduled) { return default; } @@ -189,7 +333,7 @@ public void WriteResponseHeaders(int statusCode, string? reasonPhrase, HttpRespo { // The HPACK header compressor is stateful, if we compress headers for an aborted stream we must send them. // Optimize for not compressing or sending them. - if (_streamCompleted) + if (_completeScheduled) { return; } @@ -201,11 +345,12 @@ public void WriteResponseHeaders(int statusCode, string? reasonPhrase, HttpRespo // The headers will be the final frame if: // 1. There is no content // 2. There is no trailing HEADERS frame. - _streamEnded = appCompleted - && !_startedWritingDataFrames - && (_stream.ResponseTrailers == null || _stream.ResponseTrailers.Count == 0); + if (appCompleted && !_startedWritingDataFrames && (_stream.ResponseTrailers == null || _stream.ResponseTrailers.Count == 0)) + { + _appCompletedWithNoResponseBodyOrTrailers = true; + } - _frameWriter.WriteResponseHeaders(_stream, statusCode, _streamEnded, responseHeaders); + EnqueueStateUpdate(State.FlushHeaders); } } @@ -222,7 +367,7 @@ public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellati // This length check is important because we don't want to set _startedWritingDataFrames unless a data // frame will actually be written causing the headers to be flushed. - if (_streamCompleted || data.Length == 0) + if (_completeScheduled || data.Length == 0) { return Task.CompletedTask; } @@ -230,7 +375,14 @@ public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellati _startedWritingDataFrames = true; _pipeWriter.Write(data); - return _flusher.FlushAsync(this, cancellationToken).GetAsTask(); + + EnqueueDataWrite(data.Length); + + var task = _flusher.FlushAsync(this, cancellationToken).GetAsTask(); + + Schedule(); + + return task; } } @@ -238,16 +390,21 @@ public ValueTask WriteStreamSuffixAsync() { lock (_dataWriterLock) { - if (_streamCompleted) + if (_completeScheduled) { - return GetWaiterTask(); + return ValueTask.FromResult(default); } - _streamCompleted = true; + _completeScheduled = true; _suffixSent = true; + EnqueueStateUpdate(State.Completed); + _pipeWriter.Complete(); - return GetWaiterTask(); + + Schedule(); + + return ValueTask.FromResult(default); } } @@ -255,8 +412,14 @@ public ValueTask WriteRstStreamAsync(Http2ErrorCode error) { lock (_dataWriterLock) { - // Always send the reset even if the response body is _completed. The request body may not have completed yet. Stop(); + // We queued the stream to complete but didn't complete the response yet + if (_completeScheduled && !_completedResponse) + { + // Set the error so that we can write the RST when the response completes. + _resetErrorCode = error; + return default; + } return _frameWriter.WriteRstStreamAsync(StreamId, error); } @@ -268,7 +431,7 @@ public void Advance(int bytes) { ThrowIfSuffixSentOrCompleted(); - if (_streamCompleted) + if (_completeScheduled) { return; } @@ -276,6 +439,8 @@ public void Advance(int bytes) _startedWritingDataFrames = true; _pipeWriter.Advance(bytes); + + EnqueueDataWrite(bytes); } } @@ -285,7 +450,7 @@ public Span GetSpan(int sizeHint = 0) { ThrowIfSuffixSentOrCompleted(); - if (_streamCompleted) + if (_completeScheduled) { return GetFakeMemory(sizeHint).Span; } @@ -300,7 +465,7 @@ public Memory GetMemory(int sizeHint = 0) { ThrowIfSuffixSentOrCompleted(); - if (_streamCompleted) + if (_completeScheduled) { return GetFakeMemory(sizeHint); } @@ -313,7 +478,7 @@ public void CancelPendingFlush() { lock (_dataWriterLock) { - if (_streamCompleted) + if (_completeScheduled) { return; } @@ -335,7 +500,7 @@ public ValueTask WriteDataToPipeAsync(ReadOnlySpan data, Canc // This length check is important because we don't want to set _startedWritingDataFrames unless a data // frame will actually be written causing the headers to be flushed. - if (_streamCompleted || data.Length == 0) + if (_completeScheduled || data.Length == 0) { return new ValueTask(new FlushResult(false, true)); } @@ -343,7 +508,13 @@ public ValueTask WriteDataToPipeAsync(ReadOnlySpan data, Canc _startedWritingDataFrames = true; _pipeWriter.Write(data); - return _flusher.FlushAsync(this, cancellationToken); + + EnqueueDataWrite(data.Length); + var task = _flusher.FlushAsync(this, cancellationToken); + + Schedule(); + + return task; } } @@ -371,16 +542,20 @@ public void Stop() { lock (_dataWriterLock) { - if (_streamCompleted) + _waitingForWindowUpdates = false; + + if (_completeScheduled && _completedResponse) { + // We can overschedule as long as we haven't yet completed the response. This is important because + // we may need to abort the stream if it's waiting for a window update. return; } - _streamCompleted = true; + _completeScheduled = true; - _pipeReader.CancelPendingRead(); + EnqueueStateUpdate(State.Aborted); - _frameWriter.AbortPendingStreamDataWrites(_flowControl); + Schedule(); } } @@ -388,86 +563,52 @@ public void Reset() { } - private async Task ProcessDataWrites() + internal void OnRequestProcessingEnded() { - // ProcessDataWrites runs for the lifetime of the Http2OutputProducer, and is designed to be reused by multiple streams. - // When Http2OutputProducer is no longer used (e.g. a stream is aborted and will no longer be used, or the connection is closed) - // it should be disposed so ProcessDataWrites exits. Not disposing won't cause a memory leak in release builds, but in debug - // builds active tasks are rooted on Task.s_currentActiveTasks. Dispose could be removed in the future when active tasks are - // tracked by a weak reference. See https://github.com/dotnet/runtime/issues/26565 - do + lock (_dataWriterLock) { - FlushResult flushResult = default; - ReadResult readResult = default; - try + if (_requestProcessingComplete) { - do - { - var firstWrite = true; + // Noop, we're done + return; + } - readResult = await _pipeReader.ReadAsync(); + _requestProcessingComplete = true; - if (readResult.IsCanceled) - { - // Response body is aborted, break and complete reader. - break; - } - else if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0) - { - // Output is ending and there are trailers to write - // Write any remaining content then write trailers + if (_completedResponse) + { + Stream.CompleteStream(errored: false); + } + } + } - _stream.ResponseTrailers.SetReadOnly(); + internal ValueTask CompleteResponseAsync() + { + lock (_dataWriterLock) + { + if (_completedResponse) + { + // This should never be called twice + return default; + } - if (readResult.Buffer.Length > 0) - { - // It is faster to write data and trailers together. Locking once reduces lock contention. - flushResult = await _frameWriter.WriteDataAndTrailersAsync(_stream, _flowControl, readResult.Buffer, firstWrite, _stream.ResponseTrailers); - } - else - { - flushResult = await _frameWriter.WriteResponseTrailersAsync(_stream, _stream.ResponseTrailers); - } - } - else if (readResult.IsCompleted && _streamEnded) - { - if (readResult.Buffer.Length != 0) - { - ThrowUnexpectedState(); - } + _completedResponse = true; - // Headers have already been written and there is no other content to write - flushResult = await _frameWriter.FlushAsync(outputAborter: null, cancellationToken: default); - } - else - { - var endStream = readResult.IsCompleted; - flushResult = await _frameWriter.WriteDataAsync(_stream, _flowControl, readResult.Buffer, endStream, firstWrite, forceFlush: true); - } + ValueTask task = default; - firstWrite = false; - _pipeReader.AdvanceTo(readResult.Buffer.End); - } while (!readResult.IsCompleted); - } - catch (Exception ex) + if (_resetErrorCode is { } error) { - _log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception."); + // If we have an error code to write, write it now that we're done with the response. + // Always send the reset even if the response body is completed. The request body may not have completed yet. + task = _frameWriter.WriteRstStreamAsync(StreamId, error); } - await _pipeReader.CompleteAsync(); - - // Signal via WriteStreamSuffixAsync to the stream that output has finished. - // Stream state will move to RequestProcessingStatus.ResponseCompleted - _responseCompleteTaskSource.SetResult(flushResult); - - // Wait here for the stream to be reset or disposed. - await new ValueTask(_resetAwaitable, _resetAwaitable.Version); - _resetAwaitable.Reset(); - } while (!_disposed); + if (_requestProcessingComplete) + { + Stream.CompleteStream(errored: false); + } - static void ThrowUnexpectedState() - { - throw new InvalidOperationException(nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected state where the streams output ended with data still remaining in the pipe."); + return task; } } @@ -515,6 +656,40 @@ internal Memory GetFakeMemory(int minSize) } } + public bool TryUpdateStreamWindow(int bytes) + { + var schedule = false; + + lock (_dataWriterLock) + { + var maxUpdate = Http2PeerSettings.MaxWindowSize - _streamWindow; + + if (bytes > maxUpdate) + { + return false; + } + + schedule = UpdateStreamWindow(bytes); + } + + if (schedule) + { + ScheduleResumeFromWindowUpdate(); + } + + return true; + + // Adds more bytes to the stream's window + // Returns a bool that represents whether we should schedule this producer to write + // the remaining bytes. + bool UpdateStreamWindow(long bytes) + { + var wasDepleted = _streamWindow <= 0; + _streamWindow += bytes; + return wasDepleted && _streamWindow > 0 && _unconsumedBytes > 0; + } + } + [StackTraceHidden] private void ThrowIfSuffixSentOrCompleted() { @@ -541,14 +716,17 @@ private static void ThrowWriterComplete() throw new InvalidOperationException("Cannot write to response after the request has completed."); } - private static Pipe CreateDataPipe(MemoryPool pool) + private static Pipe CreateDataPipe(MemoryPool pool, bool scheduleInline) => new Pipe(new PipeOptions ( pool: pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.ThreadPool, - pauseWriterThreshold: 1, - resumeWriterThreshold: 1, + // The unit tests rely on inline scheduling and the ability to control individual writes + // and assert individual frames. Setting the thresholds to 1 avoids frames being coaleased together + // and allows the test to assert them individually. + pauseWriterThreshold: scheduleInline ? 1 : 4096, + resumeWriterThreshold: scheduleInline ? 1 : 2048, useSynchronizationContext: false, minimumSegmentSize: pool.GetMinimumSegmentSize() )); @@ -560,8 +738,14 @@ public void Dispose() return; } _disposed = true; + } - // Set awaitable after disposed is true to ensure ProcessDataWrites exits successfully. - _resetAwaitable.SetResult(null); + [Flags] + public enum State + { + None = 0, + FlushHeaders = 1, + Aborted = 2, + Completed = 4 } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs index 54f4e14a0c5e..c69f40bfa2df 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs @@ -21,7 +21,6 @@ internal abstract partial class Http2Stream : HttpProtocol, IThreadPoolWorkItem, private Http2StreamContext _context = default!; private Http2OutputProducer _http2Output = default!; private StreamInputFlowControl _inputFlowControl = default!; - private StreamOutputFlowControl _outputFlowControl = default!; private Http2MessageBody? _messageBody; private bool _decrementCalled; @@ -56,11 +55,7 @@ public void Initialize(Http2StreamContext context) context.ServerPeerSettings.InitialWindowSize, context.ServerPeerSettings.InitialWindowSize / 2); - _outputFlowControl = new StreamOutputFlowControl( - context.ConnectionOutputFlowControl, - context.ClientPeerSettings.InitialWindowSize); - - _http2Output = new Http2OutputProducer(this, context, _outputFlowControl); + _http2Output = new Http2OutputProducer(this, context); RequestBodyPipe = CreateRequestBodyPipe(); @@ -69,8 +64,7 @@ public void Initialize(Http2StreamContext context) else { _inputFlowControl.Reset(); - _outputFlowControl.Reset(context.ClientPeerSettings.InitialWindowSize); - _http2Output.StreamReset(); + _http2Output.StreamReset(context.ClientPeerSettings.InitialWindowSize); RequestBodyPipe.Reset(); } } @@ -129,7 +123,7 @@ protected override void OnReset() protected override void OnRequestProcessingEnded() { - CompleteStream(errored: false); + _http2Output.OnRequestProcessingEnded(); } public void CompleteStream(bool errored) @@ -506,7 +500,7 @@ public void OnDataRead(int bytesRead) public bool TryUpdateOutputWindow(int bytes) { - return _context.FrameWriter.TryUpdateStreamWindow(_outputFlowControl, bytes); + return _http2Output.TryUpdateStreamWindow(bytes); } public void AbortRstStreamReceived() diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs index d1e6c674615c..506a3d5fa835 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs @@ -25,8 +25,7 @@ public Http2StreamContext( Http2PeerSettings clientPeerSettings, Http2PeerSettings serverPeerSettings, Http2FrameWriter frameWriter, - InputFlowControl connectionInputFlowControl, - OutputFlowControl connectionOutputFlowControl) : base(connectionId, protocols, altSvcHeader, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) + InputFlowControl connectionInputFlowControl) : base(connectionId, protocols, altSvcHeader, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) { StreamId = streamId; StreamLifetimeHandler = streamLifetimeHandler; @@ -34,7 +33,6 @@ public Http2StreamContext( ServerPeerSettings = serverPeerSettings; FrameWriter = frameWriter; ConnectionInputFlowControl = connectionInputFlowControl; - ConnectionOutputFlowControl = connectionOutputFlowControl; } public IHttp2StreamLifetimeHandler StreamLifetimeHandler { get; } @@ -42,7 +40,5 @@ public Http2StreamContext( public Http2PeerSettings ServerPeerSettings { get; } public Http2FrameWriter FrameWriter { get; } public InputFlowControl ConnectionInputFlowControl { get; } - public OutputFlowControl ConnectionOutputFlowControl { get; } - public int StreamId { get; set; } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelTrace.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelTrace.cs index c8df0403ac4e..f0335df43005 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelTrace.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelTrace.cs @@ -298,6 +298,38 @@ public void Http2MaxConcurrentStreamsReached(string connectionId) Http2MaxConcurrentStreamsReached(_http2Logger, connectionId); } + [LoggerMessage(60, LogLevel.Critical, @"Connection id ""{ConnectionId}"" exceeded the output operations maximum queue size.", EventName = "Http2QueueOperationsExceeded")] + private static partial void Http2QueueOperationsExceeded(ILogger logger, string connectionId, ConnectionAbortedException ex); + + public void Http2QueueOperationsExceeded(string connectionId, ConnectionAbortedException ex) + { + Http2QueueOperationsExceeded(_http2Logger, connectionId, ex); + } + + [LoggerMessage(61, LogLevel.Critical, @"Stream {StreamId} on connection id ""{ConnectionId}"" observed an unexpected state where the streams output ended with data still remaining in the pipe.", EventName = "Http2UnexpectedDataRemaining")] + private static partial void Http2UnexpectedDataRemaining(ILogger logger, int streamId, string connectionId); + + public void Http2UnexpectedDataRemaining(int streamId, string connectionId) + { + Http2UnexpectedDataRemaining(_http2Logger, streamId, connectionId); + } + + [LoggerMessage(62, LogLevel.Debug, @"The connection queue processing loop for {ConnectionId} completed.", EventName = "Http2ConnectionQueueProcessingCompleted")] + private static partial void Http2ConnectionQueueProcessingCompleted(ILogger logger, string connectionId); + + public void Http2ConnectionQueueProcessingCompleted(string connectionId) + { + Http2ConnectionQueueProcessingCompleted(_http2Logger, connectionId); + } + + [LoggerMessage(63, LogLevel.Critical, @"The event loop in connection {ConnectionId} failed unexpectedly.", EventName = "Http2UnexpectedConnectionQueueError")] + private static partial void Http2UnexpectedConnectionQueueError(ILogger logger, string connectionId, Exception ex); + + public void Http2UnexpectedConnectionQueueError(string connectionId, Exception ex) + { + Http2UnexpectedConnectionQueueError(_http2Logger, connectionId, ex); + } + [LoggerMessage(41, LogLevel.Warning, "One or more of the following response headers have been removed because they are invalid for HTTP/2 and HTTP/3 responses: 'Connection', 'Transfer-Encoding', 'Keep-Alive', 'Upgrade' and 'Proxy-Connection'.", EventName = "InvalidResponseHeaderRemoved")] private static partial void InvalidResponseHeaderRemoved(ILogger logger); diff --git a/src/Servers/Kestrel/Core/test/Http2/Http2FrameWriterTests.cs b/src/Servers/Kestrel/Core/test/Http2/Http2FrameWriterTests.cs index ffb169170a6a..91d2a1c58734 100644 --- a/src/Servers/Kestrel/Core/test/Http2/Http2FrameWriterTests.cs +++ b/src/Servers/Kestrel/Core/test/Http2/Http2FrameWriterTests.cs @@ -56,7 +56,7 @@ public async Task WriteWindowUpdate_UnsetsReservedBit() private Http2FrameWriter CreateFrameWriter(Pipe pipe) { var serviceContext = TestContextFactory.CreateServiceContext(new KestrelServerOptions()); - return new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, _dirtyMemoryPool, serviceContext); + return new Http2FrameWriter(pipe.Writer, null, null, 1, null, null, null, _dirtyMemoryPool, serviceContext); } [Fact] diff --git a/src/Servers/Kestrel/Core/test/PooledStreamStackTests.cs b/src/Servers/Kestrel/Core/test/PooledStreamStackTests.cs index efb2581a2ae5..af2d924c23f2 100644 --- a/src/Servers/Kestrel/Core/test/PooledStreamStackTests.cs +++ b/src/Servers/Kestrel/Core/test/PooledStreamStackTests.cs @@ -110,8 +110,7 @@ private static Http2Stream CreateStream(int streamId, long expirati clientPeerSettings: new Http2PeerSettings(), serverPeerSettings: new Http2PeerSettings(), frameWriter: null!, - connectionInputFlowControl: null!, - connectionOutputFlowControl: null! + connectionInputFlowControl: null! ); return new Http2Stream(new DummyApplication(), context) diff --git a/src/Servers/Kestrel/perf/Microbenchmarks/Http2/Http2FrameWriterBenchmark.cs b/src/Servers/Kestrel/perf/Microbenchmarks/Http2/Http2FrameWriterBenchmark.cs index bb5d2d7d8b1e..a80186a0af60 100644 --- a/src/Servers/Kestrel/perf/Microbenchmarks/Http2/Http2FrameWriterBenchmark.cs +++ b/src/Servers/Kestrel/perf/Microbenchmarks/Http2/Http2FrameWriterBenchmark.cs @@ -20,7 +20,6 @@ public class Http2FrameWriterBenchmark private Pipe _pipe; private Http2FrameWriter _frameWriter; private HttpResponseHeaders _responseHeaders; - private Http2Stream _stream; [GlobalSetup] public void GlobalSetup() @@ -39,15 +38,13 @@ public void GlobalSetup() new NullPipeWriter(), connectionContext: null, http2Connection: null, - new OutputFlowControl(new SingleAwaitableProvider(), initialWindowSize: int.MaxValue), + maxStreamsPerConnection: 1, timeoutControl: null, minResponseDataRate: null, "TestConnectionId", _memoryPool, serviceContext); - _stream = new MockHttp2Stream(TestContextFactory.CreateHttp2StreamContext(streamId: 0)); - _responseHeaders = new HttpResponseHeaders(); var headers = (IHeaderDictionary)_responseHeaders; headers.ContentType = "application/json"; @@ -57,7 +54,7 @@ public void GlobalSetup() [Benchmark] public void WriteResponseHeaders() { - _frameWriter.WriteResponseHeaders(_stream, 200, endStream: true, _responseHeaders); + _frameWriter.WriteResponseHeaders(streamId: 0, 200, Http2HeadersFrameFlags.END_STREAM, _responseHeaders); } [GlobalCleanup] @@ -66,16 +63,4 @@ public void Dispose() _pipe.Writer.Complete(); _memoryPool?.Dispose(); } - - private class MockHttp2Stream : Http2Stream - { - public MockHttp2Stream(Http2StreamContext context) - { - Initialize(context); - } - - public override void Execute() - { - } - } } diff --git a/src/Servers/Kestrel/shared/test/TestContextFactory.cs b/src/Servers/Kestrel/shared/test/TestContextFactory.cs index 14df1153cbd8..3e3ed65b8ebb 100644 --- a/src/Servers/Kestrel/shared/test/TestContextFactory.cs +++ b/src/Servers/Kestrel/shared/test/TestContextFactory.cs @@ -141,7 +141,6 @@ public static Http2StreamContext CreateHttp2StreamContext( Http2PeerSettings serverPeerSettings = null, Http2FrameWriter frameWriter = null, InputFlowControl connectionInputFlowControl = null, - OutputFlowControl connectionOutputFlowControl = null, ITimeoutControl timeoutControl = null) { var context = new Http2StreamContext @@ -159,8 +158,7 @@ public static Http2StreamContext CreateHttp2StreamContext( clientPeerSettings: clientPeerSettings ?? new Http2PeerSettings(), serverPeerSettings: serverPeerSettings ?? new Http2PeerSettings(), frameWriter: frameWriter, - connectionInputFlowControl: connectionInputFlowControl, - connectionOutputFlowControl: connectionOutputFlowControl + connectionInputFlowControl: connectionInputFlowControl ); context.TimeoutControl = timeoutControl; diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs index 55fafea7826e..cf1b6fdc7f40 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2ConnectionTests.cs @@ -424,9 +424,6 @@ await ExpectAsync(Http2FrameType.HEADERS, withStreamId: 1); await StopConnectionAsync(expectedLastStreamId: 1, ignoreNonGoAwayFrames: false); - - var output = (Http2OutputProducer)stream.Output; - await output._dataWriteProcessingTask.DefaultTimeout(); } [Fact] @@ -567,17 +564,22 @@ await InitializeConnectionAsync(async context => throw new InvalidOperationException("Put the stream into an invalid state by throwing after writing to response."); }); + var streamCompletedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _connection._onStreamCompleted = _ => streamCompletedTcs.TrySetResult(); + await StartStreamAsync(1, _browserRequestHeaders, endStream: true); var stream = _connection._streams[1]; serverTcs.SetResult(); + // Wait for the stream to be completed + await streamCompletedTcs.Task; + // TriggerTick will trigger the stream to be returned to the pool so we can assert it TriggerTick(); var output = (Http2OutputProducer)stream.Output; Assert.True(output._disposed); - await output._dataWriteProcessingTask.DefaultTimeout(); // Stream is not returned to the pool Assert.Equal(0, _connection.StreamPool.Count); @@ -3804,6 +3806,7 @@ await ExpectAsync(Http2FrameType.DATA, withFlags: (byte)Http2DataFrameFlags.END_STREAM, withStreamId: 3); + TriggerTick(); await WaitForConnectionStopAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); await _closedStateReached.Task.DefaultTimeout(); } @@ -4168,7 +4171,7 @@ await InitializeConnectionAsync(async context => for (var i = 0; i < expectedFullFrameCountBeforeBackpressure; i++) { await expectingDataSem.WaitAsync(); - Assert.True(context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length).IsCompleted); + await context.Response.Body.WriteAsync(_maxData, 0, _maxData.Length); } await expectingDataSem.WaitAsync(); @@ -4720,6 +4723,7 @@ await ExpectAsync(Http2FrameType.DATA, withFlags: (byte)Http2DataFrameFlags.END_STREAM, withStreamId: 1); + TriggerTick(); await _closedStateReached.Task.DefaultTimeout(); VerifyGoAway(await ReceiveFrameAsync(), 1, Http2ErrorCode.NO_ERROR); } @@ -4740,7 +4744,7 @@ public async Task AcceptNewStreamsDuringClosingConnection() await StartStreamAsync(3, _browserRequestHeaders, endStream: false); await SendDataAsync(1, _helloBytes, true); - var f = await ExpectAsync(Http2FrameType.HEADERS, + await ExpectAsync(Http2FrameType.HEADERS, withLength: 32, withFlags: (byte)Http2HeadersFrameFlags.END_HEADERS, withStreamId: 1); @@ -4766,6 +4770,8 @@ await ExpectAsync(Http2FrameType.DATA, withFlags: (byte)Http2DataFrameFlags.END_STREAM, withStreamId: 3); + TriggerTick(); + await WaitForConnectionStopAsync(expectedLastStreamId: 3, ignoreNonGoAwayFrames: false); } @@ -4790,14 +4796,14 @@ public async Task IgnoreNewStreamsDuringClosedConnection() } [Fact] - public void IOExceptionDuringFrameProcessingIsNotLoggedHigherThanDebug() + public async Task IOExceptionDuringFrameProcessingIsNotLoggedHigherThanDebug() { CreateConnection(); var ioException = new IOException(); _pair.Application.Output.Complete(ioException); - Assert.Equal(TaskStatus.RanToCompletion, _connection.ProcessRequestsAsync(new DummyApplication(_noopApplication)).Status); + await _connection.ProcessRequestsAsync(new DummyApplication(_noopApplication)).DefaultTimeout(); Assert.All(LogMessages, w => Assert.InRange(w.LogLevel, LogLevel.Trace, LogLevel.Debug)); @@ -4808,14 +4814,14 @@ public void IOExceptionDuringFrameProcessingIsNotLoggedHigherThanDebug() } [Fact] - public void UnexpectedExceptionDuringFrameProcessingLoggedAWarning() + public async Task UnexpectedExceptionDuringFrameProcessingLoggedAWarning() { CreateConnection(); var exception = new Exception(); _pair.Application.Output.Complete(exception); - Assert.Equal(TaskStatus.RanToCompletion, _connection.ProcessRequestsAsync(new DummyApplication(_noopApplication)).Status); + await _connection.ProcessRequestsAsync(new DummyApplication(_noopApplication)).DefaultTimeout(); var logMessage = LogMessages.Single(m => m.LogLevel >= LogLevel.Information); diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2StreamTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2StreamTests.cs index 52569848c45a..d1262d61c1e5 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2StreamTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2StreamTests.cs @@ -3103,15 +3103,11 @@ public async Task ResponseWithHeadersTooLarge_AbortsConnection() await InitializeConnectionAsync(async context => { context.Response.Headers["too_long"] = new string('a', (int)Http2PeerSettings.DefaultMaxFrameSize); - var ex = await Assert.ThrowsAsync(() => context.Response.WriteAsync("Hello World")).DefaultTimeout(); - appFinished.TrySetResult(ex.InnerException.Message); + await context.Response.WriteAsync("Hello World"); }); await StartStreamAsync(1, _browserRequestHeaders, endStream: true); - var message = await appFinished.Task.DefaultTimeout(); - Assert.Equal(SR.net_http_hpack_encode_failure, message); - // Just the StatusCode gets written before aborting in the continuation frame await ExpectAsync(Http2FrameType.HEADERS, withLength: 32, diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs index dca2d0321b1b..7c47189caf86 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs @@ -314,7 +314,6 @@ private class EchoAppWithNotification public async Task RunApp(HttpContext context) { - await context.Response.Body.FlushAsync(); var buffer = new byte[Http2PeerSettings.MinAllowedMaxFrameSize]; int received;