Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP3: Fix issue with GOAWAY handling and implement graceful shutdown logic in Http3LoopbackServer #56134

Merged
merged 4 commits into from
Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,36 @@ public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode st

await stream.SendResponseAsync(statusCode, headers, content).ConfigureAwait(false);

// closing the connection here causes bytes written to streams to go missing.
// Regardless, we told the client we are closing so it shouldn't matter -- they should not use this connection anymore.
//await CloseAsync(H3_NO_ERROR).ConfigureAwait(false);
await WaitForClientDisconnectAsync();

return request;
}

// Wait for the client to close the connection, e.g. after we send a GOAWAY, or after the HttpClient is disposed.
public async Task WaitForClientDisconnectAsync()
{
while (true)
{
Http3LoopbackStream stream;

try
{
stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
}
catch (QuicConnectionAbortedException abortException) when (abortException.ErrorCode == H3_NO_ERROR)
{
break;
}

using (stream)
{
await stream.AbortAndWaitForShutdownAsync(H3_REQUEST_REJECTED);
}
}

await CloseAsync(H3_NO_ERROR);
}

public override async Task WaitForCancellationAsync(bool ignoreIncomingData = true, int requestId = 0)
{
await GetOpenRequest(requestId).WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,13 @@ public async Task WaitForCancellationAsync(bool ignoreIncomingData = true)
}
}

public async Task AbortAndWaitForShutdownAsync(long errorCode)
{
_stream.AbortRead(errorCode);
_stream.AbortWrite(errorCode);
await _stream.ShutdownCompleted();
}

public async Task<(long? frameType, byte[] payload)> ReadFrameAsync()
{
long? frameType = await ReadIntegerAsync().ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private void OnServerGoAway(long lastProcessedStreamId)

lock (SyncObj)
{
if (lastProcessedStreamId > _lastProcessedStreamId)
if (_lastProcessedStreamId != -1 && lastProcessedStreamId > _lastProcessedStreamId)
{
// Server can send multiple GOAWAY frames.
// Spec says a server MUST NOT increase the stream ID in subsequent GOAWAYs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ internal override QuicStreamProvider OpenBidirectionalStream()

internal MockStream OpenStream(long streamId, bool bidirectional)
{
CheckDisposed();

ConnectionState? state = _state;
if (state is null)
{
Expand Down Expand Up @@ -274,12 +276,15 @@ internal override async ValueTask<QuicStreamProvider> AcceptStreamAsync(Cancella
catch (ChannelClosedException)
{
long errorCode = _isClient ? state._serverErrorCode : state._clientErrorCode;
throw new QuicConnectionAbortedException(errorCode);
throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicConnectionAbortedException(errorCode);
}
}

internal override ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default)
{
// TODO: We should abort local streams (and signal the peer to do likewise)
// Currently, we are not tracking the streams associated with this connection.

ConnectionState? state = _state;
if (state is not null)
{
Expand All @@ -292,10 +297,12 @@ internal override ValueTask CloseAsync(long errorCode, CancellationToken cancell
if (_isClient)
{
state._clientErrorCode = errorCode;
DrainAcceptQueue(-1, errorCode);
}
else
{
state._serverErrorCode = errorCode;
DrainAcceptQueue(errorCode, -1);
}
}

Expand All @@ -312,19 +319,37 @@ private void CheckDisposed()
}
}

private void DrainAcceptQueue(long outboundErrorCode, long inboundErrorCode)
{
ConnectionState? state = _state;
if (state is not null)
{
// TODO: We really only need to do the complete and drain once, but it doesn't really hurt to do it twice.
state._clientInitiatedStreamChannel.Writer.TryComplete();
while (state._clientInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
{
streamState._outboundReadErrorCode = streamState._outboundWriteErrorCode = outboundErrorCode;
streamState._inboundStreamBuffer?.AbortRead();
streamState._outboundStreamBuffer?.EndWrite();
}

state._serverInitiatedStreamChannel.Writer.TryComplete();
while (state._serverInitiatedStreamChannel.Reader.TryRead(out MockStream.StreamState? streamState))
{
streamState._inboundReadErrorCode = streamState._inboundWriteErrorCode = inboundErrorCode;
streamState._outboundStreamBuffer?.AbortRead();
streamState._inboundStreamBuffer?.EndWrite();
}
}
}

private void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
ConnectionState? state = _state;
if (state is not null)
{
Channel<MockStream.StreamState> streamChannel = _isClient ? state._clientInitiatedStreamChannel : state._serverInitiatedStreamChannel;
streamChannel.Writer.Complete();
}

DrainAcceptQueue(-1, -1);

PeerStreamLimit? streamLimit = LocalStreamLimit;
if (streamLimit is not null)
Expand Down Expand Up @@ -448,6 +473,7 @@ public ConnectionState(SslApplicationProtocol applicationProtocol)
_applicationProtocol = applicationProtocol;
_clientInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
_serverInitiatedStreamChannel = Channel.CreateUnbounded<MockStream.StreamState>();
_clientErrorCode = _serverErrorCode = -1;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellati
long errorCode = _isInitiator ? _streamState._inboundReadErrorCode : _streamState._outboundReadErrorCode;
if (errorCode != 0)
{
throw new QuicStreamAbortedException(errorCode);
throw (errorCode == -1) ? new QuicOperationAbortedException() : new QuicStreamAbortedException(errorCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,146 @@ public async Task TestConnect()
Assert.Equal(ApplicationProtocol.ToString(), serverConnection.NegotiatedApplicationProtocol.ToString());
}

private static async Task<QuicStream> OpenAndUseStreamAsync(QuicConnection c)
{
QuicStream s = c.OpenBidirectionalStream();

// This will pend
await s.ReadAsync(new byte[1]);

return s;
}

[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
public async Task AcceptStream_ConnectionAborted_ByClient_Throws()
public async Task CloseAsync_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
{
using var sync = new SemaphoreSlim(0);

await RunClientServer(
async clientConnection =>
{
await clientConnection.CloseAsync(ExpectedErrorCode);
await sync.WaitAsync();
},
async serverConnection =>
{
// Pend operations before the client closes.
Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
Assert.False(acceptTask.IsCompleted);
Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
Assert.False(connectTask.IsCompleted);

await serverConnection.CloseAsync(ExpectedErrorCode);

sync.Release();

// Pending ops should fail
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);

// Subsequent attempts should fail
// TODO: Which exception is correct?
if (IsMockProvider)
{
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
}
else
{
await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await serverConnection.AcceptStreamAsync());

// TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
// MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
//await Assert.ThrowsAsync<QuicOperationAbortedException>(async () => await OpenAndUseStreamAsync(serverConnection));
await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
}
});
}

[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
public async Task Dispose_WithPendingAcceptAndConnect_PendingAndSubsequentThrowOperationAbortedException()
{
using var sync = new SemaphoreSlim(0);

await RunClientServer(
async clientConnection =>
{
await sync.WaitAsync();
},
async serverConnection =>
{
// Pend operations before the client closes.
Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
Assert.False(acceptTask.IsCompleted);
Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
Assert.False(connectTask.IsCompleted);

serverConnection.Dispose();

sync.Release();

// Pending ops should fail
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);

// Subsequent attempts should fail
// TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa?
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await serverConnection.AcceptStreamAsync());
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await OpenAndUseStreamAsync(serverConnection));
});
}

[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/55242", TestPlatforms.Linux)]
public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException()
{
if (IsMockProvider)
{
return;
}

using var sync = new SemaphoreSlim(0);

await RunClientServer(
async clientConnection =>
{
await sync.WaitAsync();
QuicConnectionAbortedException ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());

await clientConnection.CloseAsync(ExpectedErrorCode);
},
async serverConnection =>
{
// Pend operations before the client closes.
Task<QuicStream> acceptTask = serverConnection.AcceptStreamAsync().AsTask();
Assert.False(acceptTask.IsCompleted);
Task<QuicStream> connectTask = OpenAndUseStreamAsync(serverConnection);
Assert.False(connectTask.IsCompleted);

sync.Release();

// Pending ops should fail
QuicConnectionAbortedException ex;

ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => acceptTask);
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => connectTask);
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);

// Subsequent attempts should fail
ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
// TODO: ActiveIssue https://github.com/dotnet/runtime/issues/56133
// MsQuic fails with System.Net.Quic.QuicException: Failed to open stream to peer. Error Code: INVALID_STATE
if (IsMsQuicProvider)
{
await Assert.ThrowsAsync<QuicException>(() => OpenAndUseStreamAsync(serverConnection));
}
else
{
ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => OpenAndUseStreamAsync(serverConnection));
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
}
});
}

Expand All @@ -79,7 +202,7 @@ private static async Task DoReads(QuicStream reader, int readCount)
[InlineData(10)]
public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOperationAbortedException(int writesBeforeClose)
{
if (typeof(T) == typeof(MockProviderFactory))
if (IsMockProvider)
{
return;
}
Expand Down Expand Up @@ -122,7 +245,7 @@ await RunClientServer(
[InlineData(10)]
public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationAbortedException(int writesBeforeClose)
{
if (typeof(T) == typeof(MockProviderFactory))
if (IsMockProvider)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public abstract class QuicTestBase<T>
public static QuicImplementationProvider ImplementationProvider { get; } = s_factory.GetProvider();
public static bool IsSupported => ImplementationProvider.IsSupported;

public static bool IsMockProvider => typeof(T) == typeof(MockProviderFactory);
public static bool IsMsQuicProvider => typeof(T) == typeof(MsQuicProviderFactory);

public static SslApplicationProtocol ApplicationProtocol { get; } = new SslApplicationProtocol("quictest");

public X509Certificate2 ServerCertificate = System.Net.Test.Common.Configuration.Certificates.GetServerCertificate();
Expand Down