From d4884898ec507c5fcf230f2206987803ece06727 Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Mon, 7 Aug 2023 14:39:14 +0200 Subject: [PATCH 1/2] Simplify Http2Connection shutdown/dispose logic --- .../SocketsHttpHandler/Http2Connection.cs | 85 ++++--------------- .../SocketsHttpHandler/HttpConnectionPool.cs | 9 -- 2 files changed, 17 insertions(+), 77 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index 2b2a1d01d8902e..f96ce45dc26f59 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -64,18 +64,15 @@ internal sealed partial class Http2Connection : HttpConnectionBase // (1) We received a GOAWAY frame from the server // (2) We have exhaustead StreamIds (i.e. _nextStream == MaxStreamId) // (3) A connection-level error occurred, in which case _abortException below is set. + // (4) The connection is being disposed. + // Requests currently in flight will continue to be processed. + // When all requests have completed, the connection will be torn down. private bool _shutdown; - private TaskCompletionSource? _shutdownWaiter; // If this is set, the connection is aborting due to an IO failure (IOException) or a protocol violation (Http2ProtocolException). // _shutdown above is true, and requests in flight have been (or are being) failed. private Exception? _abortException; - // This means that the user (i.e. the connection pool) has disposed us and will not submit further requests. - // Requests currently in flight will continue to be processed. - // When all requests have completed, the connection will be torn down. - private bool _disposed; - private const int MaxStreamId = int.MaxValue; // Temporary workaround for request burst handling on connection start. @@ -255,51 +252,23 @@ public async ValueTask SetupAsync(CancellationToken cancellationToken) _ = ProcessOutgoingFramesAsync(); } - // This will complete when the connection begins to shut down and cannot be used anymore, or if it is disposed. - public ValueTask WaitForShutdownAsync() - { - lock (SyncObject) - { - Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE"); - ObjectDisposedException.ThrowIf(_disposed, this); - - if (_shutdown) - { - Debug.Assert(_shutdownWaiter is null); - return default; - } - - _shutdownWaiter ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - return new ValueTask(_shutdownWaiter.Task); - } - } - private void Shutdown() { if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdown)}={_shutdown}, {nameof(_abortException)}={_abortException}"); Debug.Assert(Monitor.IsEntered(SyncObject)); - SignalAvailableStreamsWaiter(false); - SignalShutdownWaiter(); - - // Note _shutdown could already be set, but that's fine. - _shutdown = true; - } - - private void SignalShutdownWaiter() - { - if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdownWaiter)}?={_shutdownWaiter is not null}"); + if (!_shutdown) + { + _pool.InvalidateHttp2Connection(this); + SignalAvailableStreamsWaiter(false); - Debug.Assert(Monitor.IsEntered(SyncObject)); + _shutdown = true; - if (_shutdownWaiter is not null) - { - Debug.Assert(!_disposed); - Debug.Assert(!_shutdown); - _shutdownWaiter.SetResult(); - _shutdownWaiter = null; + if (_streamsInUse == 0) + { + FinalTeardown(); + } } } @@ -307,9 +276,6 @@ public bool TryReserveStream() { lock (SyncObject) { - Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE"); - ObjectDisposedException.ThrowIf(_disposed, this); - if (_shutdown) { return false; @@ -353,7 +319,7 @@ public void ReleaseStream() { MarkConnectionAsIdle(); - if (_disposed) + if (_shutdown) { FinalTeardown(); } @@ -367,9 +333,6 @@ public Task WaitForAvailableStreamsAsync() { lock (SyncObject) { - Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE"); - ObjectDisposedException.ThrowIf(_disposed, this); - Debug.Assert(_availableStreamsWaiter is null, "As used currently, shouldn't already have a waiter"); if (_shutdown) @@ -396,7 +359,6 @@ private void SignalAvailableStreamsWaiter(bool result) if (_availableStreamsWaiter is not null) { - Debug.Assert(!_disposed); Debug.Assert(!_shutdown); _availableStreamsWaiter.SetResult(result); _availableStreamsWaiter = null; @@ -1213,7 +1175,7 @@ private Task PerformWriteAsync(int writeBytes, T state, Func, // We must be trying to send something asynchronously (like RST_STREAM or a PING or a SETTINGS ACK) and it has raced with the connection tear down. // As such, it should not matter that we were not able to actually send the frame. // But just in case, throw ObjectDisposedException. Asynchronous callers will ignore the failure. - Debug.Assert(_disposed && _streamsInUse == 0); + Debug.Assert(_shutdown && _streamsInUse == 0); return Task.FromException(new ObjectDisposedException(nameof(Http2Connection))); } @@ -1342,7 +1304,7 @@ private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) internal void HeartBeat() { - if (_disposed) + if (_shutdown) return; try @@ -1880,7 +1842,7 @@ private void FinalTeardown() { if (NetEventSource.Log.IsEnabled()) Trace(""); - Debug.Assert(_disposed); + Debug.Assert(_shutdown); Debug.Assert(_streamsInUse == 0); GC.SuppressFinalize(this); @@ -1901,20 +1863,7 @@ public override void Dispose() { lock (SyncObject) { - if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_disposed)}={_disposed}, {nameof(_streamsInUse)}={_streamsInUse}"); - - if (!_disposed) - { - SignalAvailableStreamsWaiter(false); - SignalShutdownWaiter(); - - _disposed = true; - - if (_streamsInUse == 0) - { - FinalTeardown(); - } - } + Shutdown(); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index cf40ee0fedd8f7..163a9492ab41cf 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -742,17 +742,8 @@ private async Task AddHttp2ConnectionAsync(RequestQueue.QueueI if (connection is not null) { - // Register for shutdown notification. - // Do this before we return the connection to the pool, because that may result in it being disposed. - ValueTask shutdownTask = connection.WaitForShutdownAsync(); - // Add the new connection to the pool. ReturnHttp2Connection(connection, isNewConnection: true, queueItem.Waiter); - - // Wait for connection shutdown. - await shutdownTask.ConfigureAwait(false); - - InvalidateHttp2Connection(connection); } else { From 2315a0ff3a1bc6b15dc8576e874ac60418c37949 Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Tue, 8 Aug 2023 15:17:32 +0200 Subject: [PATCH 2/2] Add a test for SocketsHttpHandler disposal mid request --- .../System/IO/DelegateDelegatingStream.cs | 5 ++ .../SocketsHttpHandlerTest.Cancellation.cs | 62 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/src/libraries/Common/tests/System/IO/DelegateDelegatingStream.cs b/src/libraries/Common/tests/System/IO/DelegateDelegatingStream.cs index 53de599d9b1fae..e801eb9c071bfa 100644 --- a/src/libraries/Common/tests/System/IO/DelegateDelegatingStream.cs +++ b/src/libraries/Common/tests/System/IO/DelegateDelegatingStream.cs @@ -37,6 +37,8 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { } public Action WriteFunc { get; set; } public Func WriteAsyncArrayFunc { get; set; } public Func, CancellationToken, ValueTask> WriteAsyncMemoryFunc { get; set; } + public Action CopyToFunc { get; set; } + public Func CopyToAsyncFunc { get; set; } public Action DisposeFunc { get; set; } public Func DisposeAsyncFunc { get; set; } @@ -62,6 +64,9 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => WriteAsyncArrayFunc != null ? WriteAsyncArrayFunc(buffer, offset, count, cancellationToken) : base.WriteAsync(buffer, offset, count, cancellationToken); public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => WriteAsyncMemoryFunc != null ? WriteAsyncMemoryFunc(buffer, cancellationToken) : base.WriteAsync(buffer, cancellationToken); + public override void CopyTo(Stream destination, int bufferSize) { if (CopyToFunc != null) CopyToFunc(destination, bufferSize); else base.CopyTo(destination, bufferSize); } + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => CopyToAsyncFunc != null ? CopyToAsyncFunc(destination, bufferSize, cancellationToken) : base.CopyToAsync(destination, bufferSize, cancellationToken); + protected override void Dispose(bool disposing) { if (DisposeFunc != null) DisposeFunc(disposing); else base.Dispose(disposing); } public override ValueTask DisposeAsync() => DisposeAsyncFunc != null ? DisposeAsyncFunc() : base.DisposeAsync(); } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs index 6adf4a5da924ed..c793a1d55d6e76 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs @@ -412,5 +412,67 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext c return base.SerializeToStreamAsync(stream, context); } } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task RequestSent_HandlerDisposed_RequestIsUnaffected(bool post) + { + byte[] postContent = "Hello world"u8.ToArray(); + + TaskCompletionSource serverReceivedRequest = new(TaskCreationOptions.RunContinuationsAsynchronously); + + await LoopbackServerFactory.CreateClientAndServerAsync(async uri => + { + using HttpClientHandler handler = CreateHttpClientHandler(); + using HttpClient client = CreateHttpClient(handler); + + using HttpRequestMessage request = CreateRequest(post ? HttpMethod.Post : HttpMethod.Get, uri, UseVersion); + + if (post) + { + request.Content = new StreamContent(new DelegateDelegatingStream(new MemoryStream()) + { + CanSeekFunc = () => false, + CopyToFunc = (destination, _) => + { + destination.Flush(); + Assert.True(serverReceivedRequest.Task.Wait(TestHelper.PassingTestTimeout)); + destination.Write(postContent); + }, + CopyToAsyncFunc = async (destination, _, ct) => + { + await destination.FlushAsync(ct); + await serverReceivedRequest.Task.WaitAsync(ct); + await destination.WriteAsync(postContent, ct); + } + }); + } + + Task clientTask = client.SendAsync(TestAsync, request); + await serverReceivedRequest.Task.WaitAsync(TestHelper.PassingTestTimeout); + + handler.Dispose(); + await Task.Delay(1); // Give any potential disposal/cancellation some time to propagate + + await clientTask; + }, + async server => + { + await server.AcceptConnectionAsync(async connection => + { + await connection.ReadRequestDataAsync(readBody: false); + serverReceivedRequest.SetResult(); + + if (post) + { + byte[] received = await connection.ReadRequestBodyAsync(); + Assert.Equal(postContent, received); + } + + await connection.SendResponseAsync(); + }); + }); + } } }