Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Http2Connection shutdown/dispose logic #90094

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { }
public Action<byte[], int, int> WriteFunc { get; set; }
public Func<byte[], int, int, CancellationToken, Task> WriteAsyncArrayFunc { get; set; }
public Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask> WriteAsyncMemoryFunc { get; set; }
public Action<Stream, int> CopyToFunc { get; set; }
public Func<Stream, int, CancellationToken, Task> CopyToAsyncFunc { get; set; }
public Action<bool> DisposeFunc { get; set; }
public Func<ValueTask> DisposeAsyncFunc { get; set; }

Expand All @@ -62,6 +64,9 @@ public DelegateDelegatingStream(Stream innerStream) : base(innerStream) { }
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => WriteAsyncArrayFunc != null ? WriteAsyncArrayFunc(buffer, offset, count, cancellationToken) : base.WriteAsync(buffer, offset, count, cancellationToken);
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => WriteAsyncMemoryFunc != null ? WriteAsyncMemoryFunc(buffer, cancellationToken) : base.WriteAsync(buffer, cancellationToken);

public override void CopyTo(Stream destination, int bufferSize) { if (CopyToFunc != null) CopyToFunc(destination, bufferSize); else base.CopyTo(destination, bufferSize); }
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => CopyToAsyncFunc != null ? CopyToAsyncFunc(destination, bufferSize, cancellationToken) : base.CopyToAsync(destination, bufferSize, cancellationToken);

protected override void Dispose(bool disposing) { if (DisposeFunc != null) DisposeFunc(disposing); else base.Dispose(disposing); }
public override ValueTask DisposeAsync() => DisposeAsyncFunc != null ? DisposeAsyncFunc() : base.DisposeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,15 @@ internal sealed partial class Http2Connection : HttpConnectionBase
// (1) We received a GOAWAY frame from the server
// (2) We have exhaustead StreamIds (i.e. _nextStream == MaxStreamId)
// (3) A connection-level error occurred, in which case _abortException below is set.
// (4) The connection is being disposed.
// Requests currently in flight will continue to be processed.
// When all requests have completed, the connection will be torn down.
private bool _shutdown;
private TaskCompletionSource? _shutdownWaiter;

// If this is set, the connection is aborting due to an IO failure (IOException) or a protocol violation (Http2ProtocolException).
// _shutdown above is true, and requests in flight have been (or are being) failed.
private Exception? _abortException;

// This means that the user (i.e. the connection pool) has disposed us and will not submit further requests.
// Requests currently in flight will continue to be processed.
// When all requests have completed, the connection will be torn down.
private bool _disposed;

private const int MaxStreamId = int.MaxValue;

// Temporary workaround for request burst handling on connection start.
Expand Down Expand Up @@ -255,61 +252,30 @@ public async ValueTask SetupAsync(CancellationToken cancellationToken)
_ = ProcessOutgoingFramesAsync();
}

// This will complete when the connection begins to shut down and cannot be used anymore, or if it is disposed.
public ValueTask WaitForShutdownAsync()
{
lock (SyncObject)
{
Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE");
ObjectDisposedException.ThrowIf(_disposed, this);

if (_shutdown)
{
Debug.Assert(_shutdownWaiter is null);
return default;
}

_shutdownWaiter ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

return new ValueTask(_shutdownWaiter.Task);
}
}

private void Shutdown()
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdown)}={_shutdown}, {nameof(_abortException)}={_abortException}");

Debug.Assert(Monitor.IsEntered(SyncObject));

SignalAvailableStreamsWaiter(false);
SignalShutdownWaiter();

// Note _shutdown could already be set, but that's fine.
_shutdown = true;
}

private void SignalShutdownWaiter()
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdownWaiter)}?={_shutdownWaiter is not null}");
if (!_shutdown)
{
_pool.InvalidateHttp2Connection(this);
SignalAvailableStreamsWaiter(false);

Debug.Assert(Monitor.IsEntered(SyncObject));
_shutdown = true;

if (_shutdownWaiter is not null)
{
Debug.Assert(!_disposed);
Debug.Assert(!_shutdown);
_shutdownWaiter.SetResult();
_shutdownWaiter = null;
if (_streamsInUse == 0)
{
FinalTeardown();
}
}
}

public bool TryReserveStream()
{
lock (SyncObject)
{
Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE");
ObjectDisposedException.ThrowIf(_disposed, this);

if (_shutdown)
{
return false;
Expand Down Expand Up @@ -353,7 +319,7 @@ public void ReleaseStream()
{
MarkConnectionAsIdle();

if (_disposed)
if (_shutdown)
{
FinalTeardown();
}
Expand All @@ -367,9 +333,6 @@ public Task<bool> WaitForAvailableStreamsAsync()
{
lock (SyncObject)
{
Debug.Assert(!_disposed, "As currently used, we don't expect to call this after disposing and we don't handle the ODE");
ObjectDisposedException.ThrowIf(_disposed, this);

Debug.Assert(_availableStreamsWaiter is null, "As used currently, shouldn't already have a waiter");

if (_shutdown)
Expand All @@ -396,7 +359,6 @@ private void SignalAvailableStreamsWaiter(bool result)

if (_availableStreamsWaiter is not null)
{
Debug.Assert(!_disposed);
Debug.Assert(!_shutdown);
_availableStreamsWaiter.SetResult(result);
_availableStreamsWaiter = null;
Expand Down Expand Up @@ -1213,7 +1175,7 @@ private Task PerformWriteAsync<T>(int writeBytes, T state, Func<T, Memory<byte>,
// We must be trying to send something asynchronously (like RST_STREAM or a PING or a SETTINGS ACK) and it has raced with the connection tear down.
// As such, it should not matter that we were not able to actually send the frame.
// But just in case, throw ObjectDisposedException. Asynchronous callers will ignore the failure.
Debug.Assert(_disposed && _streamsInUse == 0);
Debug.Assert(_shutdown && _streamsInUse == 0);
return Task.FromException(new ObjectDisposedException(nameof(Http2Connection)));
}

Expand Down Expand Up @@ -1342,7 +1304,7 @@ private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode)

internal void HeartBeat()
{
if (_disposed)
if (_shutdown)
return;

try
Expand Down Expand Up @@ -1880,7 +1842,7 @@ private void FinalTeardown()
{
if (NetEventSource.Log.IsEnabled()) Trace("");

Debug.Assert(_disposed);
Debug.Assert(_shutdown);
Debug.Assert(_streamsInUse == 0);

GC.SuppressFinalize(this);
Expand All @@ -1901,20 +1863,7 @@ public override void Dispose()
{
lock (SyncObject)
{
if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_disposed)}={_disposed}, {nameof(_streamsInUse)}={_streamsInUse}");

if (!_disposed)
{
SignalAvailableStreamsWaiter(false);
SignalShutdownWaiter();

_disposed = true;

if (_streamsInUse == 0)
{
FinalTeardown();
}
}
Shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,17 +742,8 @@ private async Task AddHttp2ConnectionAsync(RequestQueue<Http2Connection?>.QueueI

if (connection is not null)
{
// Register for shutdown notification.
// Do this before we return the connection to the pool, because that may result in it being disposed.
ValueTask shutdownTask = connection.WaitForShutdownAsync();

// Add the new connection to the pool.
ReturnHttp2Connection(connection, isNewConnection: true, queueItem.Waiter);

// Wait for connection shutdown.
await shutdownTask.ConfigureAwait(false);

InvalidateHttp2Connection(connection);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,67 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext c
return base.SerializeToStreamAsync(stream, context);
}
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task RequestSent_HandlerDisposed_RequestIsUnaffected(bool post)
{
byte[] postContent = "Hello world"u8.ToArray();

TaskCompletionSource serverReceivedRequest = new(TaskCreationOptions.RunContinuationsAsynchronously);

await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
using HttpClientHandler handler = CreateHttpClientHandler();
using HttpClient client = CreateHttpClient(handler);

using HttpRequestMessage request = CreateRequest(post ? HttpMethod.Post : HttpMethod.Get, uri, UseVersion);

if (post)
{
request.Content = new StreamContent(new DelegateDelegatingStream(new MemoryStream())
{
CanSeekFunc = () => false,
CopyToFunc = (destination, _) =>
{
destination.Flush();
Assert.True(serverReceivedRequest.Task.Wait(TestHelper.PassingTestTimeout));
destination.Write(postContent);
},
CopyToAsyncFunc = async (destination, _, ct) =>
{
await destination.FlushAsync(ct);
await serverReceivedRequest.Task.WaitAsync(ct);
await destination.WriteAsync(postContent, ct);
}
});
}

Task<HttpResponseMessage> clientTask = client.SendAsync(TestAsync, request);
await serverReceivedRequest.Task.WaitAsync(TestHelper.PassingTestTimeout);

handler.Dispose();
await Task.Delay(1); // Give any potential disposal/cancellation some time to propagate

await clientTask;
},
async server =>
{
await server.AcceptConnectionAsync(async connection =>
{
await connection.ReadRequestDataAsync(readBody: false);
serverReceivedRequest.SetResult();

if (post)
{
byte[] received = await connection.ReadRequestBodyAsync();
Assert.Equal(postContent, received);
}

await connection.SendResponseAsync();
});
});
}
}
}