From 844c88ed0a8bcf03bd3c71a152b1d940b0d527c3 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Sat, 15 Jul 2023 14:59:35 +0200 Subject: [PATCH 01/21] Added asserts to send buffer helper --- .../src/System/Net/Quic/Internal/MsQuicBuffers.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicBuffers.cs index 594245a1cb723d..ba4a5a448b0199 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicBuffers.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.InteropServices; using Microsoft.Quic; @@ -32,8 +33,8 @@ private void FreeNativeMemory() { QUIC_BUFFER* buffers = _buffers; _buffers = null; - NativeMemory.Free(buffers); _count = 0; + NativeMemory.Free(buffers); } private void Reserve(int count) @@ -48,6 +49,10 @@ private void Reserve(int count) private void SetBuffer(int index, ReadOnlyMemory buffer) { + Debug.Assert(index < _count); + Debug.Assert(_buffers[index].Buffer is null); + Debug.Assert(_buffers[index].Length == 0); + _buffers[index].Buffer = (byte*)NativeMemory.Alloc((nuint)buffer.Length, (nuint)sizeof(byte)); _buffers[index].Length = (uint)buffer.Length; buffer.Span.CopyTo(_buffers[index].Span); @@ -93,8 +98,8 @@ public void Reset() } byte* buffer = _buffers[i].Buffer; _buffers[i].Buffer = null; - NativeMemory.Free(buffer); _buffers[i].Length = 0; + NativeMemory.Free(buffer); } } From ddb8ee7cab0241cfb8cb0776a907b29abc8fca4b Mon Sep 17 00:00:00 2001 From: ManickaP Date: Sat, 15 Jul 2023 15:02:18 +0200 Subject: [PATCH 02/21] Postpone confirming the last RECEIVE event until the data are read --- .../src/System/Net/Quic/QuicStream.cs | 12 ++- .../tests/FunctionalTests/QuicStreamTests.cs | 84 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 6165f2085cb5f0..6a3b18039a0f69 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -293,6 +293,13 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation if (complete) { _receiveTcs.TrySetResult(final: true); + unsafe + { + // Confirm the last data which came with the FIN flag. + MsQuicApi.Api.StreamReceiveComplete( + _handle, + (ulong)lastReceiveSize); + } } // Unblock the next await to end immediately, i.e. there were/are any data in the buffer. @@ -516,6 +523,9 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) (int)data.TotalBufferLength, data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN)); + // If we copied all the data and also received FIN flag, postpone the confirmation of the data until they are consumed. + bool lastReceive = (totalCopied == data.TotalBufferLength) && data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN); + if (totalCopied < data.TotalBufferLength) { Volatile.Write(ref _receivedNeedsEnable, 1); @@ -524,7 +534,7 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) _receiveTcs.TrySetResult(); data.TotalBufferLength = totalCopied; - return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : QUIC_STATUS_SUCCESS; + return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : lastReceive ? QUIC_STATUS_PENDING : QUIC_STATUS_SUCCESS; } private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data) { diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index e220decb1bd400..cc0070cb3322a7 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1208,5 +1208,89 @@ async ValueTask ReleaseOnReadsClosedAsync() } ); } + + [Theory] + [InlineData(1, true, true)] + [InlineData(1024, true, true)] + [InlineData(1024*1024*1024, true, true)] + [InlineData(1, true, false)] + [InlineData(1024, true, false)] + [InlineData(1024*1024*1024, true, false)] + [InlineData(1, false, true)] + [InlineData(1024, false, true)] + [InlineData(1024*1024*1024, false, true)] + [InlineData(1, false, false)] + [InlineData(1024, false, false)] + [InlineData(1024*1024*1024, false, false)] + public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool closeConnection) + { + using var logger = new TestUtilities.TestEventListener(_output, "Private.InternalDiagnostics.System.Net.Quic"); + await RunClientServer( + serverFunction: async connection => + { + long expectedErrorCode = closeConnection ? DefaultCloseErrorCodeClient : DefaultStreamErrorCodeClient; + QuicError expectedError = closeConnection ? QuicError.ConnectionAborted : QuicError.StreamAborted; + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); + await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + if (closeServer) + { + expectedError = QuicError.OperationAborted; + if (closeConnection) + { + expectedErrorCode = DefaultCloseErrorCodeServer; + await connection.DisposeAsync(); + } + else + { + expectedErrorCode = DefaultStreamErrorCodeServer; + await stream.DisposeAsync(); + } + } + + _output.WriteLine($"Server {stream} ReadsClosed={stream.ReadsClosed.Status}"); + var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); + if (expectedError == QuicError.OperationAborted) + { + Assert.Null(ex.ApplicationErrorCode); + } + else + { + Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); + } + }, + clientFunction: async connection => + { + long expectedErrorCode = closeConnection ? DefaultCloseErrorCodeServer : DefaultStreamErrorCodeServer; + QuicError expectedError = closeConnection ? QuicError.ConnectionAborted : QuicError.StreamAborted; + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + if (!closeServer) + { + expectedError = QuicError.OperationAborted; + if (closeConnection) + { + expectedErrorCode = DefaultCloseErrorCodeClient; + await connection.DisposeAsync(); + } + else + { + expectedErrorCode = DefaultStreamErrorCodeClient; + await stream.DisposeAsync(); + } + } + + _output.WriteLine($"Client {stream} ReadsClosed={stream.ReadsClosed.Status}"); + var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); + if (expectedError == QuicError.OperationAborted) + { + Assert.Null(ex.ApplicationErrorCode); + } + else + { + Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); + } + } + ); + } } } From bd5a627effde6978b8048f248ffb02061e94b8b5 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Mon, 24 Jul 2023 15:59:44 +0200 Subject: [PATCH 03/21] Removed lock --- .../src/System/Net/Quic/QuicStream.cs | 66 +++++++------------ 1 file changed, 22 insertions(+), 44 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 6a3b18039a0f69..4a65964a4cc6cc 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -109,7 +109,6 @@ public sealed partial class QuicStream } }; private MsQuicBuffers _sendBuffers = new MsQuicBuffers(); - private readonly object _sendBuffersLock = new object(); private readonly long _defaultErrorCode; @@ -341,7 +340,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo /// The region of memory to write data from. /// The token to monitor for cancellation requests. The default value is . /// Notifies the peer about gracefully closing the write side, i.e.: sends FIN flag with the data. - public ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrites, CancellationToken cancellationToken = default) + public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrites, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_disposed == 1, this); @@ -355,11 +354,11 @@ public ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrites, Ca NetEventSource.Info(this, $"{this} Stream writing memory of '{buffer.Length}' bytes while {(completeWrites ? "completing" : "not completing")} writes."); } - if (_sendTcs.IsCompleted && cancellationToken.IsCancellationRequested) + if (_sendTcs.IsCompleted) { // Special case exception type for pre-canceled token while we've already transitioned to a final state and don't need to abort write. // It must happen before we try to get the value task, since the task source is versioned and each instance must be awaited. - return ValueTask.FromCanceled(cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); } // Concurrent call, this one lost the race. @@ -371,7 +370,8 @@ public ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrites, Ca // No need to call anything since we already have a result, most likely an exception. if (valueTask.IsCompleted) { - return valueTask; + await valueTask.ConfigureAwait(false); + return; } // For an empty buffer complete immediately, close the writing side of the stream if necessary. @@ -382,41 +382,27 @@ public ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrites, Ca { CompleteWrites(); } - return valueTask; + await valueTask.ConfigureAwait(false); + return; } - lock (_sendBuffersLock) + unsafe { - ObjectDisposedException.ThrowIf(_disposed == 1, this); // TODO: valueTask is left unobserved - unsafe + _sendBuffers.Initialize(buffer); + int status = MsQuicApi.Api.StreamSend( + _handle, + _sendBuffers.Buffers, + (uint)_sendBuffers.Count, + completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, + null); + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) { - if (_sendBuffers.Count > 0 && _sendBuffers.Buffers[0].Buffer != null) - { - // _sendBuffers are not reset, meaning SendComplete for the previous WriteAsync call didn't arrive yet. - // In case of cancellation, the task from _sendTcs is finished before the aborting. It is technically possible for subsequent - // WriteAsync to grab the next task from _sendTcs and start executing before SendComplete event occurs for the previous (canceled) write. - // This is not an "invalid nested call", because the previous task has finished. Best guess is to mimic OperationAborted as it will be from Abort - // that would execute soon enough, if not already. Not final, because Abort should be the one to set final exception. - _sendTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted), final: false); - return valueTask; - } - - _sendBuffers.Initialize(buffer); - int status = MsQuicApi.Api.StreamSend( - _handle, - _sendBuffers.Buffers, - (uint)_sendBuffers.Count, - completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, - null); - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) - { - _sendBuffers.Reset(); - _sendTcs.TrySetException(exception, final: true); - } + _sendBuffers.Reset(); + _sendTcs.TrySetException(exception, final: true); } } - return valueTask; + await valueTask.ConfigureAwait(false); } /// @@ -538,12 +524,7 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) } private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data) { - // In case of cancellation, the task from _sendTcs is finished before the aborting. It is technically possible for subsequent WriteAsync to grab the next task - // from _sendTcs and start executing before SendComplete event occurs for the previous (canceled) write - lock (_sendBuffersLock) - { - _sendBuffers.Reset(); - } + _sendBuffers.Reset(); if (data.Canceled == 0) { _sendTcs.TrySetResult(); @@ -706,11 +687,8 @@ public override async ValueTask DisposeAsync() Debug.Assert(_sendTcs.KeepAliveReleased); _handle.Dispose(); - lock (_sendBuffersLock) - { - // TODO: memory leak if not disposed - _sendBuffers.Dispose(); - } + // TODO: memory leak if not disposed + _sendBuffers.Dispose(); unsafe void StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode) { From 08f1bae0f70e05e2cfc25bdf1c0b4f460312b5ca Mon Sep 17 00:00:00 2001 From: ManickaP Date: Mon, 24 Jul 2023 15:59:55 +0200 Subject: [PATCH 04/21] Debug tests --- .../System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs | 4 ++++ .../tests/FunctionalTests/QuicStreamTests.cs | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs index 206eac76ac7878..d6c07ce81c0d4c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs @@ -314,6 +314,10 @@ public int StreamShutdown(MsQuicSafeHandle stream, QUIC_STREAM_SHUTDOWN_FLAGS fl try { stream.DangerousAddRef(ref success); + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(stream, $"{stream} StreamShutdown({flags})."); + } return ApiTable->StreamShutdown(stream.QuicHandle, flags, code); } finally diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index cc0070cb3322a7..4ec83cbb32acb3 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1210,18 +1210,18 @@ async ValueTask ReleaseOnReadsClosedAsync() } [Theory] - [InlineData(1, true, true)] + /*[InlineData(1, true, true)] [InlineData(1024, true, true)] - [InlineData(1024*1024*1024, true, true)] + [InlineData(1024*1024*1024, true, true)]*/ [InlineData(1, true, false)] - [InlineData(1024, true, false)] + /*[InlineData(1024, true, false)] [InlineData(1024*1024*1024, true, false)] [InlineData(1, false, true)] [InlineData(1024, false, true)] [InlineData(1024*1024*1024, false, true)] [InlineData(1, false, false)] [InlineData(1024, false, false)] - [InlineData(1024*1024*1024, false, false)] + [InlineData(1024*1024*1024, false, false)]*/ public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool closeConnection) { using var logger = new TestUtilities.TestEventListener(_output, "Private.InternalDiagnostics.System.Net.Quic"); From a3ae2b75ee3f72e525bf1327e4606a3d73905a17 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 28 Jul 2023 18:46:12 +0200 Subject: [PATCH 05/21] ReadsClosed test fixed, and some better logging --- .../Net/Quic/Internal/ReceiveBuffers.cs | 9 +- .../Internal/ResettableValueTaskSource.cs | 4 + .../src/System/Net/Quic/QuicConnection.cs | 2 +- .../src/System/Net/Quic/QuicListener.cs | 2 +- .../src/System/Net/Quic/QuicStream.cs | 34 ++++- .../tests/FunctionalTests/QuicStreamTests.cs | 140 +++++++++++++++--- 6 files changed, 159 insertions(+), 32 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs index 531ac0171ca070..57c7aec082c81c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs @@ -12,6 +12,7 @@ internal struct ReceiveBuffers private readonly object _syncRoot; private MultiArrayBuffer _buffer; private bool _final; + private int _lastReceiveSize; public ReceiveBuffers() { @@ -47,6 +48,7 @@ public int CopyFrom(ReadOnlySpan quicBuffers, int totalLength, bool } _final = final; + _lastReceiveSize = totalLength; _buffer.EnsureAvailableSpace(totalLength); int totalCopied = 0; @@ -66,7 +68,7 @@ public int CopyFrom(ReadOnlySpan quicBuffers, int totalLength, bool } } - public int CopyTo(Memory buffer, out bool isCompleted, out bool isEmpty) + public int CopyTo(Memory buffer, out bool completed, out bool empty, out int lastReceiveSize) { lock (_syncRoot) { @@ -79,8 +81,9 @@ public int CopyTo(Memory buffer, out bool isCompleted, out bool isEmpty) _buffer.Discard(copied); } - isCompleted = _buffer.IsEmpty && _final; - isEmpty = _buffer.IsEmpty; + completed = _buffer.IsEmpty && _final; + empty = _buffer.IsEmpty; + lastReceiveSize = _lastReceiveSize; return copied; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index e7c0cf87bfd5d8..c359b853ab33ed 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -130,6 +130,10 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C } } + /// + /// Gets a that will transition to a completed state with the last transition of this source, i.e. into . + /// + /// The that will transition to a completed state with the last transition of this source. public Task GetFinalTask() => _finalTaskSource.Task; private bool TryComplete(Exception? exception, bool final) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 9602e3742dbd4d..c58db1f986890f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -575,7 +575,7 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, { if (NetEventSource.Log.IsEnabled()) { - NetEventSource.Error(null, $"Received event {connectionEvent->Type} while connection is already disposed"); + NetEventSource.Error(null, $"Received event {connectionEvent->Type} for [conn][{(nint)connection:X11}] while connection is already disposed"); } return QUIC_STATUS_INVALID_STATE; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs index ee7bc3acf194e0..30dd019274d183 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs @@ -329,7 +329,7 @@ private static unsafe int NativeCallback(QUIC_HANDLE* listener, void* context, Q { if (NetEventSource.Log.IsEnabled()) { - NetEventSource.Error(null, $"Received event {listenerEvent->Type} while listener is already disposed"); + NetEventSource.Error(null, $"Received event {listenerEvent->Type} for [list][{(nint)listener:X11}] while listener is already disposed"); } return QUIC_STATUS_INVALID_STATE; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 4a65964a4cc6cc..dde2930ea8ae1f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -134,7 +134,19 @@ public sealed partial class QuicStream /// or when for is called, /// or when the peer called for . /// - public Task ReadsClosed => _receiveTcs.GetFinalTask(); + public Task ReadsClosed + { + get + { + GCHandle keepAlive = GCHandle.Alloc(this); + Task finalTask = _receiveTcs.GetFinalTask(); + finalTask.ContinueWith(static (_, state) => + { + ((GCHandle)state!).Free(); + }, keepAlive, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + return finalTask; + } + } /// /// A that will get completed once writing side has been closed. @@ -143,7 +155,19 @@ public sealed partial class QuicStream /// or when for is called, /// or when the peer called for . /// - public Task WritesClosed => _sendTcs.GetFinalTask(); + public Task WritesClosed + { + get + { + GCHandle keepAlive = GCHandle.Alloc(this); + Task finalTask = _sendTcs.GetFinalTask(); + finalTask.ContinueWith(static (_, state) => + { + ((GCHandle)state!).Free(); + }, keepAlive, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + return finalTask; + } + } /// public override string ToString() => _handle.ToString(); @@ -284,7 +308,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation } // Copy data from the buffer, reduce target and increment total. - int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty); + int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty, out int lastReceiveSize); buffer = buffer.Slice(copied); totalCopied += copied; @@ -607,7 +631,7 @@ private unsafe int HandleStreamEvent(ref QUIC_STREAM_EVENT streamEvent) #pragma warning disable CS3016 [UnmanagedCallersOnly(CallConvs = new Type[] { typeof(CallConvCdecl) })] #pragma warning restore CS3016 - private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, QUIC_STREAM_EVENT* streamEvent) + private static unsafe int NativeCallback(QUIC_HANDLE* stream, void* context, QUIC_STREAM_EVENT* streamEvent) { GCHandle stateHandle = GCHandle.FromIntPtr((IntPtr)context); @@ -616,7 +640,7 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, { if (NetEventSource.Log.IsEnabled()) { - NetEventSource.Error(null, $"Received event {streamEvent->Type} while connection is already disposed"); + NetEventSource.Error(null, $"Received event {streamEvent->Type} for [strm][{(nint)stream:X11}] while stream is already disposed"); } return QUIC_STATUS_INVALID_STATE; } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index 4ec83cbb32acb3..d0e0c522f42f46 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1210,44 +1210,47 @@ async ValueTask ReleaseOnReadsClosedAsync() } [Theory] - /*[InlineData(1, true, true)] + [InlineData(1, true, true)] [InlineData(1024, true, true)] - [InlineData(1024*1024*1024, true, true)]*/ - [InlineData(1, true, false)] - /*[InlineData(1024, true, false)] - [InlineData(1024*1024*1024, true, false)] + [InlineData(1024*1024*1024, true, true)] [InlineData(1, false, true)] [InlineData(1024, false, true)] [InlineData(1024*1024*1024, false, true)] + [InlineData(1, true, false)] + [InlineData(1024, true, false)] + [InlineData(1024*1024*1024, true, false)] [InlineData(1, false, false)] [InlineData(1024, false, false)] - [InlineData(1024*1024*1024, false, false)]*/ - public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool closeConnection) + [InlineData(1024*1024*1024, false, false)] + public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { - using var logger = new TestUtilities.TestEventListener(_output, "Private.InternalDiagnostics.System.Net.Quic"); + using SemaphoreSlim sem = new SemaphoreSlim(0); await RunClientServer( serverFunction: async connection => { - long expectedErrorCode = closeConnection ? DefaultCloseErrorCodeClient : DefaultStreamErrorCodeClient; - QuicError expectedError = closeConnection ? QuicError.ConnectionAborted : QuicError.StreamAborted; + QuicError expectedError = QuicError.ConnectionAborted; + long expectedErrorCode = DefaultCloseErrorCodeClient; await using QuicStream stream = await connection.AcceptInboundStreamAsync(); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); if (closeServer) { + await sem.WaitAsync(); expectedError = QuicError.OperationAborted; - if (closeConnection) + expectedErrorCode = DefaultCloseErrorCodeServer; + if (useDispose) { - expectedErrorCode = DefaultCloseErrorCodeServer; await connection.DisposeAsync(); } else { - expectedErrorCode = DefaultStreamErrorCodeServer; - await stream.DisposeAsync(); + await connection.CloseAsync(DefaultCloseErrorCodeServer); } } + else + { + sem.Release(); + } - _output.WriteLine($"Server {stream} ReadsClosed={stream.ReadsClosed.Status}"); var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); if (expectedError == QuicError.OperationAborted) { @@ -1260,26 +1263,29 @@ await RunClientServer( }, clientFunction: async connection => { - long expectedErrorCode = closeConnection ? DefaultCloseErrorCodeServer : DefaultStreamErrorCodeServer; - QuicError expectedError = closeConnection ? QuicError.ConnectionAborted : QuicError.StreamAborted; + QuicError expectedError = QuicError.ConnectionAborted; + long expectedErrorCode = DefaultCloseErrorCodeServer; await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); if (!closeServer) { + await sem.WaitAsync(); expectedError = QuicError.OperationAborted; - if (closeConnection) + expectedErrorCode = DefaultCloseErrorCodeClient; + if (useDispose) { - expectedErrorCode = DefaultCloseErrorCodeClient; await connection.DisposeAsync(); } else { - expectedErrorCode = DefaultStreamErrorCodeClient; - await stream.DisposeAsync(); + await connection.CloseAsync(DefaultCloseErrorCodeClient); } } + else + { + sem.Release(); + } - _output.WriteLine($"Client {stream} ReadsClosed={stream.ReadsClosed.Status}"); var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); if (expectedError == QuicError.OperationAborted) { @@ -1292,5 +1298,95 @@ await RunClientServer( } ); } + + [Theory] + [InlineData(1, true, true)] + [InlineData(1024, true, true)] + [InlineData(1024*1024*1024, true, true)] + [InlineData(1, false, true)] + [InlineData(1024, false, true)] + [InlineData(1024*1024*1024, false, true)] + [InlineData(1, true, false)] + [InlineData(1024, true, false)] + [InlineData(1024*1024*1024, true, false)] + [InlineData(1, false, false)] + [InlineData(1024, false, false)] + [InlineData(1024*1024*1024, false, false)] + public async Task WritesClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) + { + using SemaphoreSlim sem = new SemaphoreSlim(0); + await RunClientServer( + serverFunction: async connection => + { + QuicError expectedError = QuicError.ConnectionAborted; + long expectedErrorCode = DefaultCloseErrorCodeClient; + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); + await stream.WriteAsync(new byte[payloadSize]); + if (closeServer) + { + await sem.WaitAsync(); + expectedError = QuicError.OperationAborted; + expectedErrorCode = DefaultCloseErrorCodeServer; + if (useDispose) + { + await connection.DisposeAsync(); + } + else + { + await connection.CloseAsync(DefaultCloseErrorCodeServer); + } + } + else + { + sem.Release(); + } + + var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); + if (expectedError == QuicError.OperationAborted) + { + Assert.Null(ex.ApplicationErrorCode); + } + else + { + Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); + } + }, + clientFunction: async connection => + { + QuicError expectedError = QuicError.ConnectionAborted; + long expectedErrorCode = DefaultCloseErrorCodeServer; + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await stream.WriteAsync(new byte[payloadSize]); + if (!closeServer) + { + await sem.WaitAsync(); + expectedError = QuicError.OperationAborted; + expectedErrorCode = DefaultCloseErrorCodeClient; + if (useDispose) + { + await connection.DisposeAsync(); + } + else + { + await connection.CloseAsync(DefaultCloseErrorCodeClient); + } + } + else + { + sem.Release(); + } + + var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); + if (expectedError == QuicError.OperationAborted) + { + Assert.Null(ex.ApplicationErrorCode); + } + else + { + Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); + } + } + ); + } } } From aeb74958e1113b4ec7048b334cb9a731b3e529f5 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Tue, 8 Aug 2023 10:34:38 +0200 Subject: [PATCH 06/21] Final task keep alive, abort order, timeout for graceful write-side shutdown in dispose, named constants --- .../Net/Quic/Internal/MsQuicExtensions.cs | 2 + .../Internal/ResettableValueTaskSource.cs | 26 +- .../src/System/Net/Quic/QuicDefaults.cs | 7 +- .../src/System/Net/Quic/QuicListener.cs | 4 +- .../src/System/Net/Quic/QuicStream.cs | 107 ++++----- .../FunctionalTests/QuicListenerTests.cs | 6 +- .../tests/FunctionalTests/QuicStreamTests.cs | 223 ++++++++++++++++-- 7 files changed, 285 insertions(+), 90 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicExtensions.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicExtensions.cs index fbade293cfda3a..4ca4ad6e7c6c58 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicExtensions.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicExtensions.cs @@ -66,6 +66,8 @@ public override string ToString() => $"{{ {nameof(SEND_SHUTDOWN_COMPLETE.Graceful)} = {SEND_SHUTDOWN_COMPLETE.Graceful} }}", QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE => $"{{ {nameof(SHUTDOWN_COMPLETE.ConnectionShutdown)} = {SHUTDOWN_COMPLETE.ConnectionShutdown}, {nameof(SHUTDOWN_COMPLETE.ConnectionShutdownByApp)} = {SHUTDOWN_COMPLETE.ConnectionShutdownByApp}, {nameof(SHUTDOWN_COMPLETE.ConnectionClosedRemotely)} = {SHUTDOWN_COMPLETE.ConnectionClosedRemotely}, {nameof(SHUTDOWN_COMPLETE.ConnectionErrorCode)} = {SHUTDOWN_COMPLETE.ConnectionErrorCode}, {nameof(SHUTDOWN_COMPLETE.ConnectionCloseStatus)} = {SHUTDOWN_COMPLETE.ConnectionCloseStatus} }}", + QUIC_STREAM_EVENT_TYPE.IDEAL_SEND_BUFFER_SIZE + => $"{{ {nameof(IDEAL_SEND_BUFFER_SIZE.ByteCount)} = {IDEAL_SEND_BUFFER_SIZE.ByteCount} }}", _ => string.Empty }; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index c359b853ab33ed..5100d47e3e86ba 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -30,6 +30,7 @@ private enum State private CancellationTokenRegistration _cancellationRegistration; private Action? _cancellationAction; private GCHandle _keepAlive; + private bool _finalContinuationRegistered; private readonly TaskCompletionSource _finalTaskSource; @@ -39,8 +40,7 @@ public ResettableValueTaskSource(bool runContinuationsAsynchronously = true) _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = runContinuationsAsynchronously }; _cancellationRegistration = default; _keepAlive = default; - - // TODO: defer instantiation only after Task is retrieved + _finalContinuationRegistered = false; _finalTaskSource = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); } @@ -134,7 +134,27 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C /// Gets a that will transition to a completed state with the last transition of this source, i.e. into . /// /// The that will transition to a completed state with the last transition of this source. - public Task GetFinalTask() => _finalTaskSource.Task; + public Task GetFinalTask(object? keepAlive) + { + if (_finalContinuationRegistered || keepAlive is null) + { + return _finalTaskSource.Task; + } + + lock (this) + { + if (!_finalContinuationRegistered) + { + GCHandle handle = GCHandle.Alloc(keepAlive); + _finalTaskSource.Task.ContinueWith(static (_, state) => + { + ((GCHandle)state!).Free(); + }, handle, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + _finalContinuationRegistered = true; + } + return _finalTaskSource.Task; + } + } private bool TryComplete(Exception? exception, bool final) { diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs index 4715effc5dbaf2..8ed45e15c5ac51 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs @@ -36,5 +36,10 @@ internal static partial class QuicDefaults /// /// Our own imposed timeout in the handshake process, since in certain cases MsQuic will not impose theirs, see . /// - public static readonly TimeSpan HandshakeTimeout = TimeSpan.FromSeconds(10); + public static readonly TimeSpan ConnectionHandshakeTimeout = TimeSpan.FromSeconds(10); + + /// + /// Our own imposed timeout on the stream disposal, since the peer's behavior - lack of consuming data - might block the graceful shutdown of the writing side. + /// + public static readonly TimeSpan StreamDisposeTimeout = TimeSpan.FromSeconds(2.5); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs index 30dd019274d183..134fd3f819d4c5 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs @@ -214,7 +214,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient try { using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token); - linkedCts.CancelAfter(QuicDefaults.HandshakeTimeout); + linkedCts.CancelAfter(QuicDefaults.ConnectionHandshakeTimeout); cancellationToken = linkedCts.Token; wrapException = true; QuicServerConnectionOptions options = await _connectionOptionsCallback(connection, clientHello, cancellationToken).ConfigureAwait(false); @@ -251,7 +251,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient NetEventSource.Error(connection, $"{connection} Connection handshake timed out: {oce}"); } - Exception ex = ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException(QuicError.ConnectionTimeout, null, SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.HandshakeTimeout), oce)); + Exception ex = ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException(QuicError.ConnectionTimeout, null, SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.ConnectionHandshakeTimeout), oce)); await connection.DisposeAsync().ConfigureAwait(false); if (!_acceptQueue.Writer.TryWrite(ex)) { diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index dde2930ea8ae1f..182ce7aeb6ffda 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -134,19 +134,7 @@ public sealed partial class QuicStream /// or when for is called, /// or when the peer called for . /// - public Task ReadsClosed - { - get - { - GCHandle keepAlive = GCHandle.Alloc(this); - Task finalTask = _receiveTcs.GetFinalTask(); - finalTask.ContinueWith(static (_, state) => - { - ((GCHandle)state!).Free(); - }, keepAlive, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - return finalTask; - } - } + public Task ReadsClosed => _receiveTcs.GetFinalTask(this); /// /// A that will get completed once writing side has been closed. @@ -155,19 +143,7 @@ public Task ReadsClosed /// or when for is called, /// or when the peer called for . /// - public Task WritesClosed - { - get - { - GCHandle keepAlive = GCHandle.Alloc(this); - Task finalTask = _sendTcs.GetFinalTask(); - finalTask.ContinueWith(static (_, state) => - { - ((GCHandle)state!).Free(); - }, keepAlive, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - return finalTask; - } - } + public Task WritesClosed => _sendTcs.GetFinalTask(this); /// public override string ToString() => _handle.ToString(); @@ -446,19 +422,13 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode) } QUIC_STREAM_SHUTDOWN_FLAGS flags = QUIC_STREAM_SHUTDOWN_FLAGS.NONE; - if (abortDirection.HasFlag(QuicAbortDirection.Read)) + if (abortDirection.HasFlag(QuicAbortDirection.Read) && !_receiveTcs.IsCompleted) { - if (_receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_reading_aborted), final: true)) - { - flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE; - } + flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE; } - if (abortDirection.HasFlag(QuicAbortDirection.Write)) + if (abortDirection.HasFlag(QuicAbortDirection.Write) && !_sendTcs.IsCompleted) { - if (_sendTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted), final: true)) - { - flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_SEND; - } + flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_SEND; } // Nothing to abort, the requested sides to abort are already closed. if (flags == QUIC_STREAM_SHUTDOWN_FLAGS.NONE) @@ -470,7 +440,6 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode) { NetEventSource.Info(this, $"{this} Aborting {abortDirection} with {errorCode}"); } - unsafe { ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamShutdown( @@ -479,6 +448,15 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode) (ulong)errorCode), "StreamShutdown failed"); } + + if (abortDirection.HasFlag(QuicAbortDirection.Read)) + { + _receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_reading_aborted), final: true); + } + if (abortDirection.HasFlag(QuicAbortDirection.Write)) + { + _sendTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted), final: true); + } } /// @@ -492,16 +470,23 @@ public void CompleteWrites() { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (_shutdownTcs.TryInitialize(out _, this)) + // Nothing to complete, the writing side is already closed. + if (_sendTcs.IsCompleted) { - unsafe - { - ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamShutdown( - _handle, - QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, - default), - "StreamShutdown failed"); - } + return; + } + + if (NetEventSource.Log.IsEnabled()) + { + NetEventSource.Info(this, $"{this} Completing writes."); + } + unsafe + { + ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamShutdown( + _handle, + QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, + default), + "StreamShutdown failed"); } } @@ -678,33 +663,30 @@ public override async ValueTask DisposeAsync() return; } - ValueTask valueTask; - // If the stream wasn't started successfully, gracelessly abort it. if (!_startedTcs.IsCompletedSuccessfully) { // Check if the stream has been shut down and if not, shut it down. - if (_shutdownTcs.TryInitialize(out valueTask, this)) - { - StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT | QUIC_STREAM_SHUTDOWN_FLAGS.IMMEDIATE, _defaultErrorCode); - } + StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT | QUIC_STREAM_SHUTDOWN_FLAGS.IMMEDIATE, _defaultErrorCode); } else { - // Abort the read side of the stream if it hasn't been fully consumed. - if (_receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(), final: true)) + // Abort the read side and complete the write side if that side hasn't been completed yet. + if (!_receiveTcs.IsCompleted) { StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE, _defaultErrorCode); } - // Check if the stream has been shut down and if not, shut it down. - if (_shutdownTcs.TryInitialize(out valueTask, this)) + if (!_sendTcs.IsCompleted) { StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, default); } } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await valueTask.ConfigureAwait(false); + if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this)) + { + await valueTask.ConfigureAwait(false); + } Debug.Assert(_startedTcs.IsCompleted); // TODO: Revisit this with https://github.com/dotnet/runtime/issues/79818 and https://github.com/dotnet/runtime/issues/79911 Debug.Assert(_receiveTcs.KeepAliveReleased); @@ -727,6 +709,17 @@ unsafe void StreamShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode) NetEventSource.Error(this, $"{this} StreamShutdown({flags}) failed: {ThrowHelper.GetErrorMessageForStatus(status)}."); } } + else + { + if (flags.HasFlag(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE) && !_receiveTcs.IsCompleted) + { + _receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_reading_aborted), final: true); + } + if (flags.HasFlag(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_SEND) && !_sendTcs.IsCompleted) + { + _sendTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted), final: true); + } + } } } } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs index 68509bf6b5571f..450d17c882385c 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs @@ -146,19 +146,19 @@ public async Task AcceptConnectionAsync_SlowOptionsCallback_TimesOut(bool useCan { if (useCancellationToken) { - var oce = await Assert.ThrowsAnyAsync(() => Task.Delay(QuicDefaults.HandshakeTimeout + TimeSpan.FromSeconds(1), cancellationToken)); + var oce = await Assert.ThrowsAnyAsync(() => Task.Delay(QuicDefaults.ConnectionHandshakeTimeout + TimeSpan.FromSeconds(1), cancellationToken)); Assert.True(cancellationToken.IsCancellationRequested); Assert.Equal(cancellationToken, oce.CancellationToken); ExceptionDispatchInfo.Throw(oce); } - await Task.Delay(QuicDefaults.HandshakeTimeout + TimeSpan.FromSeconds(1)); + await Task.Delay(QuicDefaults.ConnectionHandshakeTimeout + TimeSpan.FromSeconds(1)); return CreateQuicServerOptions(); }; await using QuicListener listener = await CreateQuicListener(listenerOptions); ValueTask connectTask = CreateQuicConnection(listener.LocalEndPoint); Exception exception = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionTimeout, async () => await listener.AcceptConnectionAsync()); - Assert.Equal(SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.HandshakeTimeout), exception.Message); + Assert.Equal(SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.ConnectionHandshakeTimeout), exception.Message); // Connect attempt should be stopped with "UserCanceled". var connectException = await Assert.ThrowsAsync(async () => await connectTask); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index d0e0c522f42f46..05c53dff50a9ed 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1209,19 +1209,23 @@ async ValueTask ReleaseOnReadsClosedAsync() ); } + private const int SmallestPayload = 1; + private const int SmallPayload = 1024; + private const int BigPayload = 1024*1024*1024; + [Theory] - [InlineData(1, true, true)] - [InlineData(1024, true, true)] - [InlineData(1024*1024*1024, true, true)] - [InlineData(1, false, true)] - [InlineData(1024, false, true)] - [InlineData(1024*1024*1024, false, true)] - [InlineData(1, true, false)] - [InlineData(1024, true, false)] - [InlineData(1024*1024*1024, true, false)] - [InlineData(1, false, false)] - [InlineData(1024, false, false)] - [InlineData(1024*1024*1024, false, false)] + [InlineData(SmallestPayload, true, true)] + [InlineData(SmallPayload, true, true)] + [InlineData(BigPayload, true, true)] + [InlineData(SmallestPayload, false, true)] + [InlineData(SmallPayload, false, true)] + [InlineData(BigPayload, false, true)] + [InlineData(SmallestPayload, true, false)] + [InlineData(SmallPayload, true, false)] + [InlineData(BigPayload, true, false)] + [InlineData(SmallestPayload, false, false)] + [InlineData(SmallPayload, false, false)] + [InlineData(BigPayload, false, false)] public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { using SemaphoreSlim sem = new SemaphoreSlim(0); @@ -1300,18 +1304,18 @@ await RunClientServer( } [Theory] - [InlineData(1, true, true)] - [InlineData(1024, true, true)] - [InlineData(1024*1024*1024, true, true)] - [InlineData(1, false, true)] - [InlineData(1024, false, true)] - [InlineData(1024*1024*1024, false, true)] - [InlineData(1, true, false)] - [InlineData(1024, true, false)] - [InlineData(1024*1024*1024, true, false)] - [InlineData(1, false, false)] - [InlineData(1024, false, false)] - [InlineData(1024*1024*1024, false, false)] + [InlineData(SmallestPayload, true, true)] + [InlineData(SmallPayload, true, true)] + [InlineData(BigPayload, true, true)] + [InlineData(SmallestPayload, false, true)] + [InlineData(SmallPayload, false, true)] + [InlineData(BigPayload, false, true)] + [InlineData(SmallestPayload, true, false)] + [InlineData(SmallPayload, true, false)] + [InlineData(BigPayload, true, false)] + [InlineData(SmallestPayload, false, false)] + [InlineData(SmallPayload, false, false)] + [InlineData(BigPayload, false, false)] public async Task WritesClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { using SemaphoreSlim sem = new SemaphoreSlim(0); @@ -1388,5 +1392,176 @@ await RunClientServer( } ); } + + [Theory] + [InlineData(SmallestPayload, true, true)] + [InlineData(SmallPayload, true, true)] + [InlineData(SmallestPayload, false, true)] + [InlineData(SmallPayload, false, true)] + [InlineData(SmallestPayload, true, false)] + [InlineData(SmallPayload, true, false)] + [InlineData(SmallestPayload, false, false)] + [InlineData(SmallPayload, false, false)] + public async Task ReadsWritesClosedFinish_StreamDisposed(int payloadSize, bool disposeServer, bool completeWrites) + { + using SemaphoreSlim serverSem = new SemaphoreSlim(0); + using SemaphoreSlim clientSem = new SemaphoreSlim(0); + + await RunClientServer( + serverFunction: async connection => + { + // Establish stream, send the payload based on the input and synchronize with the peer. + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); + await stream.WriteAsync(new byte[payloadSize], completeWrites); + serverSem.Release(); + await clientSem.WaitAsync(); + + if (disposeServer) + { + await DisposeSide(stream, serverSem); + } + else + { + await WaitingSide(stream, clientSem, DefaultStreamErrorCodeClient); + } + }, + clientFunction: async connection => + { + // Establish stream, send the payload based on the input and synchronize with the peer. + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await stream.WriteAsync(new byte[payloadSize], completeWrites); + clientSem.Release(); + await serverSem.WaitAsync(); + + if (disposeServer) + { + await WaitingSide(stream, serverSem, DefaultStreamErrorCodeServer); + } + else + { + await DisposeSide(stream, clientSem); + } + }); + + async ValueTask DisposeSide(QuicStream stream, SemaphoreSlim sem) + { + await stream.DisposeAsync(); + + // Reads should be aborted as we didn't consume the data. + var readEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.ReadsClosed); + Assert.Null(readEx.ApplicationErrorCode); + + // Writes should be completed successfully as they should all fit in the peers buffers. + Assert.True(stream.WritesClosed.IsCompletedSuccessfully); + + sem.Release(); + } + async ValueTask WaitingSide(QuicStream stream, SemaphoreSlim sem, long errorCode) + { + await sem.WaitAsync(); + + // Reads should be still open as the peer closed gracefully and we are keeping the data in buffer. + Assert.False(stream.ReadsClosed.IsCompleted); + + if (!completeWrites) + { + // Writes must be aborted by the peer as we didn't complete them. + var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.WritesClosed); + Assert.Equal(errorCode, writeEx.ApplicationErrorCode); + } + else + { + // Writes must be closed, but whether successfully or not depends on the timing. + // Peer might have aborted reading side before receiving all the data. + Assert.True(stream.WritesClosed.IsCompleted); + } + } + } + + [Theory] + [InlineData(true, true)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(false, false)] + public async Task ReadsWritesClosedFinish_BigData_StreamDisposed(bool disposeServer, bool completeWrites) + { + using SemaphoreSlim serverSem = new SemaphoreSlim(0); + using SemaphoreSlim clientSem = new SemaphoreSlim(0); + + await RunClientServer( + serverFunction: async connection => + { + // Establish stream, send the payload based on the input and synchronize with the peer. + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); + await stream.WriteAsync(new byte[BigPayload], completeWrites); + serverSem.Release(); + await clientSem.WaitAsync(); + + if (disposeServer) + { + await DisposeSide(stream, serverSem, DefaultStreamErrorCodeServer); + } + else + { + await WaitingSide(stream, clientSem, DefaultStreamErrorCodeClient); + } + }, + clientFunction: async connection => + { + // Establish stream, send the payload based on the input and synchronize with the peer. + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); + await stream.WriteAsync(new byte[BigPayload], completeWrites); + clientSem.Release(); + await serverSem.WaitAsync(); + + if (disposeServer) + { + await WaitingSide(stream, serverSem, DefaultStreamErrorCodeServer); + } + else + { + await DisposeSide(stream, clientSem, DefaultStreamErrorCodeClient); + } + }); + + async ValueTask DisposeSide(QuicStream stream, SemaphoreSlim sem, long errorCode) + { + // With big payloads, graceful closure of the writing side won't shutdown the stream until the peer either consumes the data or aborts the reading side. + stream.Abort(QuicAbortDirection.Write, errorCode); + + await stream.DisposeAsync(); + + // Reads should be aborted as we didn't consume the data. + var readEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.ReadsClosed); + Assert.Null(readEx.ApplicationErrorCode); + + // Writes should be aborted as we aborted them. + var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.WritesClosed); + Assert.Null(writeEx.ApplicationErrorCode); + + sem.Release(); + } + async ValueTask WaitingSide(QuicStream stream, SemaphoreSlim sem, long errorCode) + { + await sem.WaitAsync(); + + // Reads will be aborted by the peer as we didn't consume them all. + var readEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.ReadsClosed); + Assert.Equal(errorCode, readEx.ApplicationErrorCode); + + if (!completeWrites) + { + // Writes must be aborted by the peer as we didn't complete them. + var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.WritesClosed); + Assert.Equal(errorCode, writeEx.ApplicationErrorCode); + } + else + { + // Writes must be closed, but whether successfully or not depends on the timing. + // Peer might have aborted reading side before receiving all the data. + Assert.True(stream.WritesClosed.IsCompleted); + } + } + } } } From 9d9334f4553f7f15829883cd6cee962539fd21de Mon Sep 17 00:00:00 2001 From: ManickaP Date: Wed, 9 Aug 2023 15:57:38 +0200 Subject: [PATCH 07/21] Tests --- .../tests/FunctionalTests/QuicStreamTests.cs | 221 ++++++------------ 1 file changed, 77 insertions(+), 144 deletions(-) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index 05c53dff50a9ed..f597416d702e78 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1211,34 +1211,41 @@ async ValueTask ReleaseOnReadsClosedAsync() private const int SmallestPayload = 1; private const int SmallPayload = 1024; + private const int BufferPayload = 64*1024; + private const int BufferPlusPayload = 64*1024+1; private const int BigPayload = 1024*1024*1024; + public static IEnumerable PayloadSizeAndTwoBools() + { + var boolValues = new [] { true, false }; + var payloadValues = new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload, BigPayload }; + return + from payload in payloadValues + from bool1 in boolValues + from bool2 in boolValues + select new object[] { payload, bool1, bool2 }; + } + [Theory] - [InlineData(SmallestPayload, true, true)] - [InlineData(SmallPayload, true, true)] - [InlineData(BigPayload, true, true)] - [InlineData(SmallestPayload, false, true)] - [InlineData(SmallPayload, false, true)] - [InlineData(BigPayload, false, true)] - [InlineData(SmallestPayload, true, false)] - [InlineData(SmallPayload, true, false)] - [InlineData(BigPayload, true, false)] - [InlineData(SmallestPayload, false, false)] - [InlineData(SmallPayload, false, false)] - [InlineData(BigPayload, false, false)] + [MemberData(nameof(PayloadSizeAndTwoBools))] public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { - using SemaphoreSlim sem = new SemaphoreSlim(0); + using SemaphoreSlim serverSem = new SemaphoreSlim(0); + using SemaphoreSlim clientSem = new SemaphoreSlim(0); + await RunClientServer( serverFunction: async connection => { QuicError expectedError = QuicError.ConnectionAborted; long expectedErrorCode = DefaultCloseErrorCodeClient; + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + serverSem.Release(); + await clientSem.WaitAsync(); + if (closeServer) { - await sem.WaitAsync(); expectedError = QuicError.OperationAborted; expectedErrorCode = DefaultCloseErrorCodeServer; if (useDispose) @@ -1250,10 +1257,6 @@ await RunClientServer( await connection.CloseAsync(DefaultCloseErrorCodeServer); } } - else - { - sem.Release(); - } var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); if (expectedError == QuicError.OperationAborted) @@ -1269,11 +1272,14 @@ await RunClientServer( { QuicError expectedError = QuicError.ConnectionAborted; long expectedErrorCode = DefaultCloseErrorCodeServer; + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + clientSem.Release(); + await serverSem.WaitAsync(); + if (!closeServer) { - await sem.WaitAsync(); expectedError = QuicError.OperationAborted; expectedErrorCode = DefaultCloseErrorCodeClient; if (useDispose) @@ -1285,10 +1291,6 @@ await RunClientServer( await connection.CloseAsync(DefaultCloseErrorCodeClient); } } - else - { - sem.Release(); - } var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); if (expectedError == QuicError.OperationAborted) @@ -1304,31 +1306,25 @@ await RunClientServer( } [Theory] - [InlineData(SmallestPayload, true, true)] - [InlineData(SmallPayload, true, true)] - [InlineData(BigPayload, true, true)] - [InlineData(SmallestPayload, false, true)] - [InlineData(SmallPayload, false, true)] - [InlineData(BigPayload, false, true)] - [InlineData(SmallestPayload, true, false)] - [InlineData(SmallPayload, true, false)] - [InlineData(BigPayload, true, false)] - [InlineData(SmallestPayload, false, false)] - [InlineData(SmallPayload, false, false)] - [InlineData(BigPayload, false, false)] + [MemberData(nameof(PayloadSizeAndTwoBools))] public async Task WritesClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { - using SemaphoreSlim sem = new SemaphoreSlim(0); + using SemaphoreSlim serverSem = new SemaphoreSlim(0); + using SemaphoreSlim clientSem = new SemaphoreSlim(0); + await RunClientServer( serverFunction: async connection => { QuicError expectedError = QuicError.ConnectionAborted; long expectedErrorCode = DefaultCloseErrorCodeClient; + await using QuicStream stream = await connection.AcceptInboundStreamAsync(); await stream.WriteAsync(new byte[payloadSize]); + serverSem.Release(); + await clientSem.WaitAsync(); + if (closeServer) { - await sem.WaitAsync(); expectedError = QuicError.OperationAborted; expectedErrorCode = DefaultCloseErrorCodeServer; if (useDispose) @@ -1340,10 +1336,6 @@ await RunClientServer( await connection.CloseAsync(DefaultCloseErrorCodeServer); } } - else - { - sem.Release(); - } var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); if (expectedError == QuicError.OperationAborted) @@ -1359,11 +1351,14 @@ await RunClientServer( { QuicError expectedError = QuicError.ConnectionAborted; long expectedErrorCode = DefaultCloseErrorCodeServer; + await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); await stream.WriteAsync(new byte[payloadSize]); + clientSem.Release(); + await serverSem.WaitAsync(); + if (!closeServer) { - await sem.WaitAsync(); expectedError = QuicError.OperationAborted; expectedErrorCode = DefaultCloseErrorCodeClient; if (useDispose) @@ -1375,10 +1370,6 @@ await RunClientServer( await connection.CloseAsync(DefaultCloseErrorCodeClient); } } - else - { - sem.Release(); - } var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); if (expectedError == QuicError.OperationAborted) @@ -1394,18 +1385,12 @@ await RunClientServer( } [Theory] - [InlineData(SmallestPayload, true, true)] - [InlineData(SmallPayload, true, true)] - [InlineData(SmallestPayload, false, true)] - [InlineData(SmallPayload, false, true)] - [InlineData(SmallestPayload, true, false)] - [InlineData(SmallPayload, true, false)] - [InlineData(SmallestPayload, false, false)] - [InlineData(SmallPayload, false, false)] + [MemberData(nameof(PayloadSizeAndTwoBools))] public async Task ReadsWritesClosedFinish_StreamDisposed(int payloadSize, bool disposeServer, bool completeWrites) { using SemaphoreSlim serverSem = new SemaphoreSlim(0); using SemaphoreSlim clientSem = new SemaphoreSlim(0); + TaskCompletionSource tcs = new TaskCompletionSource(); await RunClientServer( serverFunction: async connection => @@ -1418,11 +1403,11 @@ await RunClientServer( if (disposeServer) { - await DisposeSide(stream, serverSem); + await DisposeSide(stream, tcs); } else { - await WaitingSide(stream, clientSem, DefaultStreamErrorCodeClient); + await WaitingSide(stream, tcs.Task, DefaultStreamErrorCodeClient); } }, clientFunction: async connection => @@ -1435,99 +1420,31 @@ await RunClientServer( if (disposeServer) { - await WaitingSide(stream, serverSem, DefaultStreamErrorCodeServer); + await WaitingSide(stream, tcs.Task, DefaultStreamErrorCodeServer); } else { - await DisposeSide(stream, clientSem); + await DisposeSide(stream, tcs); } }); - async ValueTask DisposeSide(QuicStream stream, SemaphoreSlim sem) + async ValueTask DisposeSide(QuicStream stream, TaskCompletionSource tcs) { - await stream.DisposeAsync(); - - // Reads should be aborted as we didn't consume the data. - var readEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.ReadsClosed); - Assert.Null(readEx.ApplicationErrorCode); - - // Writes should be completed successfully as they should all fit in the peers buffers. - Assert.True(stream.WritesClosed.IsCompletedSuccessfully); - - sem.Release(); - } - async ValueTask WaitingSide(QuicStream stream, SemaphoreSlim sem, long errorCode) - { - await sem.WaitAsync(); - - // Reads should be still open as the peer closed gracefully and we are keeping the data in buffer. - Assert.False(stream.ReadsClosed.IsCompleted); - - if (!completeWrites) - { - // Writes must be aborted by the peer as we didn't complete them. - var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.WritesClosed); - Assert.Equal(errorCode, writeEx.ApplicationErrorCode); - } - else - { - // Writes must be closed, but whether successfully or not depends on the timing. - // Peer might have aborted reading side before receiving all the data. - Assert.True(stream.WritesClosed.IsCompleted); - } - } - } - - [Theory] - [InlineData(true, true)] - [InlineData(false, true)] - [InlineData(true, false)] - [InlineData(false, false)] - public async Task ReadsWritesClosedFinish_BigData_StreamDisposed(bool disposeServer, bool completeWrites) - { - using SemaphoreSlim serverSem = new SemaphoreSlim(0); - using SemaphoreSlim clientSem = new SemaphoreSlim(0); - - await RunClientServer( - serverFunction: async connection => + // Abort writing side if it's getting blocked by peer not consuming the data. + long? abortCode = null; + if (completeWrites || payloadSize >= BigPayload) { - // Establish stream, send the payload based on the input and synchronize with the peer. - await using QuicStream stream = await connection.AcceptInboundStreamAsync(); - await stream.WriteAsync(new byte[BigPayload], completeWrites); - serverSem.Release(); - await clientSem.WaitAsync(); - - if (disposeServer) + try { - await DisposeSide(stream, serverSem, DefaultStreamErrorCodeServer); + await stream.WritesClosed.WaitAsync(TimeSpan.FromSeconds(2.5)); } - else + catch (TimeoutException) { - await WaitingSide(stream, clientSem, DefaultStreamErrorCodeClient); + Assert.True(payloadSize >= BigPayload); + abortCode = 0xABC; + stream.Abort(QuicAbortDirection.Write, abortCode.Value); } - }, - clientFunction: async connection => - { - // Establish stream, send the payload based on the input and synchronize with the peer. - await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); - await stream.WriteAsync(new byte[BigPayload], completeWrites); - clientSem.Release(); - await serverSem.WaitAsync(); - - if (disposeServer) - { - await WaitingSide(stream, serverSem, DefaultStreamErrorCodeServer); - } - else - { - await DisposeSide(stream, clientSem, DefaultStreamErrorCodeClient); - } - }); - - async ValueTask DisposeSide(QuicStream stream, SemaphoreSlim sem, long errorCode) - { - // With big payloads, graceful closure of the writing side won't shutdown the stream until the peer either consumes the data or aborts the reading side. - stream.Abort(QuicAbortDirection.Write, errorCode); + } await stream.DisposeAsync(); @@ -1536,18 +1453,34 @@ async ValueTask DisposeSide(QuicStream stream, SemaphoreSlim sem, long errorCode Assert.Null(readEx.ApplicationErrorCode); // Writes should be aborted as we aborted them. - var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.WritesClosed); - Assert.Null(writeEx.ApplicationErrorCode); + if (abortCode.HasValue) + { + var writeEx = await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => stream.WritesClosed); + Assert.Null(writeEx.ApplicationErrorCode); + } + else + { + // Writes should be completed successfully as they should all fit in the peers buffers. + Assert.True(stream.WritesClosed.IsCompletedSuccessfully); + } - sem.Release(); + tcs.SetResult(abortCode); } - async ValueTask WaitingSide(QuicStream stream, SemaphoreSlim sem, long errorCode) + async ValueTask WaitingSide(QuicStream stream, Task task, long errorCode) { - await sem.WaitAsync(); + long? abortCode = await task; // Reads will be aborted by the peer as we didn't consume them all. - var readEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.ReadsClosed); - Assert.Equal(errorCode, readEx.ApplicationErrorCode); + if (abortCode.HasValue) + { + var readEx = await AssertThrowsQuicExceptionAsync(QuicError.StreamAborted, () => stream.ReadsClosed); + Assert.Equal(abortCode.Value, readEx.ApplicationErrorCode); + } + // Reads should be still open as the peer closed gracefully and we are keeping the data in buffer. + else + { + Assert.False(stream.ReadsClosed.IsCompleted); + } if (!completeWrites) { From 80185f0248c003db74d197f40f616f85fd859979 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Wed, 9 Aug 2023 17:51:57 +0200 Subject: [PATCH 08/21] Always wait for SEND_COMPLETE --- .../src/System/Net/Quic/QuicStream.cs | 87 +++++++++++++------ 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 182ce7aeb6ffda..f8bc4891f7789b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -362,47 +362,80 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit } // Concurrent call, this one lost the race. - if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken)) + if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, default)) { throw new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "write")); } - // No need to call anything since we already have a result, most likely an exception. - if (valueTask.IsCompleted) + // Register cancellation if the token can be cancelled and the task is not completed yet. + CancellationTokenRegistration cancellationRegistration = default; + bool aborted = false; + try { - await valueTask.ConfigureAwait(false); - return; - } + if (cancellationToken.CanBeCanceled) + { + cancellationRegistration = cancellationToken.UnsafeRegister((obj, cancellationToken) => + { + try + { + QuicStream stream = (QuicStream)obj!; + stream.Abort(QuicAbortDirection.Write, stream._defaultErrorCode); + aborted = true; + } + catch (ObjectDisposedException) + { + // We collided with a Dispose in another thread. This can happen + // when using CancellationTokenSource.CancelAfter. + // Ignore the exception + } + }, this); + } - // For an empty buffer complete immediately, close the writing side of the stream if necessary. - if (buffer.IsEmpty) - { - _sendTcs.TrySetResult(); - if (completeWrites) + // No need to call anything since we already have a result, most likely an exception. + if (valueTask.IsCompleted) + { + await valueTask.ConfigureAwait(false); + return; + } + + // For an empty buffer complete immediately, close the writing side of the stream if necessary. + if (buffer.IsEmpty) + { + _sendTcs.TrySetResult(); + if (completeWrites) + { + CompleteWrites(); + } + await valueTask.ConfigureAwait(false); + return; + } + + unsafe { - CompleteWrites(); + _sendBuffers.Initialize(buffer); + int status = MsQuicApi.Api.StreamSend( + _handle, + _sendBuffers.Buffers, + (uint)_sendBuffers.Count, + completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, + null); + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + { + _sendBuffers.Reset(); + _sendTcs.TrySetException(exception, final: true); + } } + await valueTask.ConfigureAwait(false); - return; } - - unsafe + finally { - _sendBuffers.Initialize(buffer); - int status = MsQuicApi.Api.StreamSend( - _handle, - _sendBuffers.Buffers, - (uint)_sendBuffers.Count, - completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, - null); - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + cancellationRegistration.Dispose(); + if (aborted) { - _sendBuffers.Reset(); - _sendTcs.TrySetException(exception, final: true); + cancellationToken.ThrowIfCancellationRequested(); } } - - await valueTask.ConfigureAwait(false); } /// From 33e18ed58567d771308c891fb79214efe5f6bd96 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Thu, 10 Aug 2023 09:57:42 +0200 Subject: [PATCH 09/21] Exclude BigPayload on platforms where it can OOM --- .../Common/tests/TestUtilities/System/PlatformDetection.cs | 1 + .../System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs b/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs index d86f78702e5f3d..b2eedae28a5ce9 100644 --- a/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs +++ b/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs @@ -195,6 +195,7 @@ private static bool GetLinqExpressionsBuiltWithIsInterpretingOnly() public static bool IsRareEnumsSupported => !IsNativeAot; + public static bool IsIntMaxValueArrayIndexSupported => !s_largeArrayIsNotSupported.Value; public static bool IsNotIntMaxValueArrayIndexSupported => s_largeArrayIsNotSupported.Value; public static bool IsAssemblyLoadingSupported => !IsNativeAot; diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index f597416d702e78..adc6b6e0f4b0ed 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1218,7 +1218,9 @@ async ValueTask ReleaseOnReadsClosedAsync() public static IEnumerable PayloadSizeAndTwoBools() { var boolValues = new [] { true, false }; - var payloadValues = new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload, BigPayload }; + var payloadValues = PlatformDetection.IsIntMaxValueArrayIndexSupported ? + new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload, BigPayload } : + new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload }; return from payload in payloadValues from bool1 in boolValues From f6da31ed3b1b35dffc5dc806bee1a2edfc5746b8 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 11 Aug 2023 10:07:34 +0200 Subject: [PATCH 10/21] Removed unintended code changes --- .../System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs | 4 ---- .../System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs | 7 +------ .../System.Net.Quic/src/System/Net/Quic/QuicListener.cs | 4 ++-- .../tests/FunctionalTests/QuicListenerTests.cs | 6 +++--- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs index d6c07ce81c0d4c..206eac76ac7878 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicApi.NativeMethods.cs @@ -314,10 +314,6 @@ public int StreamShutdown(MsQuicSafeHandle stream, QUIC_STREAM_SHUTDOWN_FLAGS fl try { stream.DangerousAddRef(ref success); - if (NetEventSource.Log.IsEnabled()) - { - NetEventSource.Info(stream, $"{stream} StreamShutdown({flags})."); - } return ApiTable->StreamShutdown(stream.QuicHandle, flags, code); } finally diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs index 8ed45e15c5ac51..4715effc5dbaf2 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicDefaults.cs @@ -36,10 +36,5 @@ internal static partial class QuicDefaults /// /// Our own imposed timeout in the handshake process, since in certain cases MsQuic will not impose theirs, see . /// - public static readonly TimeSpan ConnectionHandshakeTimeout = TimeSpan.FromSeconds(10); - - /// - /// Our own imposed timeout on the stream disposal, since the peer's behavior - lack of consuming data - might block the graceful shutdown of the writing side. - /// - public static readonly TimeSpan StreamDisposeTimeout = TimeSpan.FromSeconds(2.5); + public static readonly TimeSpan HandshakeTimeout = TimeSpan.FromSeconds(10); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs index 134fd3f819d4c5..30dd019274d183 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs @@ -214,7 +214,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient try { using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token); - linkedCts.CancelAfter(QuicDefaults.ConnectionHandshakeTimeout); + linkedCts.CancelAfter(QuicDefaults.HandshakeTimeout); cancellationToken = linkedCts.Token; wrapException = true; QuicServerConnectionOptions options = await _connectionOptionsCallback(connection, clientHello, cancellationToken).ConfigureAwait(false); @@ -251,7 +251,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient NetEventSource.Error(connection, $"{connection} Connection handshake timed out: {oce}"); } - Exception ex = ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException(QuicError.ConnectionTimeout, null, SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.ConnectionHandshakeTimeout), oce)); + Exception ex = ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException(QuicError.ConnectionTimeout, null, SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.HandshakeTimeout), oce)); await connection.DisposeAsync().ConfigureAwait(false); if (!_acceptQueue.Writer.TryWrite(ex)) { diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs index 450d17c882385c..68509bf6b5571f 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicListenerTests.cs @@ -146,19 +146,19 @@ public async Task AcceptConnectionAsync_SlowOptionsCallback_TimesOut(bool useCan { if (useCancellationToken) { - var oce = await Assert.ThrowsAnyAsync(() => Task.Delay(QuicDefaults.ConnectionHandshakeTimeout + TimeSpan.FromSeconds(1), cancellationToken)); + var oce = await Assert.ThrowsAnyAsync(() => Task.Delay(QuicDefaults.HandshakeTimeout + TimeSpan.FromSeconds(1), cancellationToken)); Assert.True(cancellationToken.IsCancellationRequested); Assert.Equal(cancellationToken, oce.CancellationToken); ExceptionDispatchInfo.Throw(oce); } - await Task.Delay(QuicDefaults.ConnectionHandshakeTimeout + TimeSpan.FromSeconds(1)); + await Task.Delay(QuicDefaults.HandshakeTimeout + TimeSpan.FromSeconds(1)); return CreateQuicServerOptions(); }; await using QuicListener listener = await CreateQuicListener(listenerOptions); ValueTask connectTask = CreateQuicConnection(listener.LocalEndPoint); Exception exception = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionTimeout, async () => await listener.AcceptConnectionAsync()); - Assert.Equal(SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.ConnectionHandshakeTimeout), exception.Message); + Assert.Equal(SR.Format(SR.net_quic_handshake_timeout, QuicDefaults.HandshakeTimeout), exception.Message); // Connect attempt should be stopped with "UserCanceled". var connectException = await Assert.ThrowsAsync(async () => await connectTask); From 79023fe633837d2a213274f91f93c7f18cc2201f Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 11 Aug 2023 10:52:38 +0200 Subject: [PATCH 11/21] Reverted postponing reading FIN, if data have chance to get buffered with FIN, we will do that. --- .../TestUtilities/System/PlatformDetection.cs | 1 - .../Net/Quic/Internal/ReceiveBuffers.cs | 5 +- .../src/System/Net/Quic/QuicStream.cs | 14 +--- .../tests/FunctionalTests/QuicStreamTests.cs | 74 +++++++++++-------- 4 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs b/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs index b2eedae28a5ce9..d86f78702e5f3d 100644 --- a/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs +++ b/src/libraries/Common/tests/TestUtilities/System/PlatformDetection.cs @@ -195,7 +195,6 @@ private static bool GetLinqExpressionsBuiltWithIsInterpretingOnly() public static bool IsRareEnumsSupported => !IsNativeAot; - public static bool IsIntMaxValueArrayIndexSupported => !s_largeArrayIsNotSupported.Value; public static bool IsNotIntMaxValueArrayIndexSupported => s_largeArrayIsNotSupported.Value; public static bool IsAssemblyLoadingSupported => !IsNativeAot; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs index 57c7aec082c81c..93f78acc87f028 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ReceiveBuffers.cs @@ -12,7 +12,6 @@ internal struct ReceiveBuffers private readonly object _syncRoot; private MultiArrayBuffer _buffer; private bool _final; - private int _lastReceiveSize; public ReceiveBuffers() { @@ -48,7 +47,6 @@ public int CopyFrom(ReadOnlySpan quicBuffers, int totalLength, bool } _final = final; - _lastReceiveSize = totalLength; _buffer.EnsureAvailableSpace(totalLength); int totalCopied = 0; @@ -68,7 +66,7 @@ public int CopyFrom(ReadOnlySpan quicBuffers, int totalLength, bool } } - public int CopyTo(Memory buffer, out bool completed, out bool empty, out int lastReceiveSize) + public int CopyTo(Memory buffer, out bool completed, out bool empty) { lock (_syncRoot) { @@ -83,7 +81,6 @@ public int CopyTo(Memory buffer, out bool completed, out bool empty, out i completed = _buffer.IsEmpty && _final; empty = _buffer.IsEmpty; - lastReceiveSize = _lastReceiveSize; return copied; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index f8bc4891f7789b..2fa6ff26e9a0d9 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -284,7 +284,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation } // Copy data from the buffer, reduce target and increment total. - int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty, out int lastReceiveSize); + int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty); buffer = buffer.Slice(copied); totalCopied += copied; @@ -292,13 +292,6 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation if (complete) { _receiveTcs.TrySetResult(final: true); - unsafe - { - // Confirm the last data which came with the FIN flag. - MsQuicApi.Api.StreamReceiveComplete( - _handle, - (ulong)lastReceiveSize); - } } // Unblock the next await to end immediately, i.e. there were/are any data in the buffer. @@ -551,9 +544,6 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) (int)data.TotalBufferLength, data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN)); - // If we copied all the data and also received FIN flag, postpone the confirmation of the data until they are consumed. - bool lastReceive = (totalCopied == data.TotalBufferLength) && data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN); - if (totalCopied < data.TotalBufferLength) { Volatile.Write(ref _receivedNeedsEnable, 1); @@ -562,7 +552,7 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) _receiveTcs.TrySetResult(); data.TotalBufferLength = totalCopied; - return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : lastReceive ? QUIC_STATUS_PENDING : QUIC_STATUS_SUCCESS; + return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : QUIC_STATUS_SUCCESS; } private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data) { diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index adc6b6e0f4b0ed..dd449cf6cc98f0 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1218,7 +1218,7 @@ async ValueTask ReleaseOnReadsClosedAsync() public static IEnumerable PayloadSizeAndTwoBools() { var boolValues = new [] { true, false }; - var payloadValues = PlatformDetection.IsIntMaxValueArrayIndexSupported ? + var payloadValues = !PlatformDetection.IsInHelix ? new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload, BigPayload } : new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload }; return @@ -1232,6 +1232,7 @@ from bool2 in boolValues [MemberData(nameof(PayloadSizeAndTwoBools))] public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { + //using var logger = new TestUtilities.TestEventListener(Console.Out, "Private.InternalDiagnostics.System.Net.Quic"); using SemaphoreSlim serverSem = new SemaphoreSlim(0); using SemaphoreSlim clientSem = new SemaphoreSlim(0); @@ -1243,6 +1244,11 @@ await RunClientServer( await using QuicStream stream = await connection.AcceptInboundStreamAsync(); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + // Make sure the data gets received by the peer if we expect the reading side to get buffered including FIN. + if (payloadSize <= BufferPayload) + { + await stream.WritesClosed; + } serverSem.Release(); await clientSem.WaitAsync(); @@ -1260,15 +1266,7 @@ await RunClientServer( } } - var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); - if (expectedError == QuicError.OperationAborted) - { - Assert.Null(ex.ApplicationErrorCode); - } - else - { - Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); - } + await CheckReadsClosed(stream, expectedError, expectedErrorCode); }, clientFunction: async connection => { @@ -1277,6 +1275,10 @@ await RunClientServer( await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); await stream.WriteAsync(new byte[payloadSize], completeWrites: true); + if (payloadSize <= BufferPayload) + { + await stream.WritesClosed; + } clientSem.Release(); await serverSem.WaitAsync(); @@ -1294,6 +1296,23 @@ await RunClientServer( } } + await CheckReadsClosed(stream, expectedError, expectedErrorCode); + } + ); + + async ValueTask CheckReadsClosed(QuicStream stream, QuicError expectedError, long expectedErrorCode) + { + // All data should be buffered if they fit in the internal buffer, reading should still pass. + if (payloadSize <= BufferPayload) + { + Assert.False(stream.ReadsClosed.IsCompleted); + var buffer = new byte[BufferPayload]; + var length = await ReadAll(stream, buffer); + Assert.True(stream.ReadsClosed.IsCompletedSuccessfully); + Assert.Equal(payloadSize, length); + } + else + { var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed); if (expectedError == QuicError.OperationAborted) { @@ -1304,7 +1323,7 @@ await RunClientServer( Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); } } - ); + } } [Theory] @@ -1339,15 +1358,7 @@ await RunClientServer( } } - var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); - if (expectedError == QuicError.OperationAborted) - { - Assert.Null(ex.ApplicationErrorCode); - } - else - { - Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); - } + await CheckWritesClosed(stream, expectedError, expectedErrorCode); }, clientFunction: async connection => { @@ -1373,17 +1384,22 @@ await RunClientServer( } } - var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); - if (expectedError == QuicError.OperationAborted) - { - Assert.Null(ex.ApplicationErrorCode); - } - else - { - Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); - } + await CheckWritesClosed(stream, expectedError, expectedErrorCode); } ); + + async ValueTask CheckWritesClosed(QuicStream stream, QuicError expectedError, long expectedErrorCode) + { + var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed); + if (expectedError == QuicError.OperationAborted) + { + Assert.Null(ex.ApplicationErrorCode); + } + else + { + Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode); + } + } } [Theory] From 3b24271a5fbf00fb2118d6138af7fe254528f0f3 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 11 Aug 2023 16:43:34 +0200 Subject: [PATCH 12/21] Clean ups --- .../src/System/Net/Quic/Internal/MsQuicConfiguration.cs | 1 - .../src/System/Net/Quic/Internal/MsQuicTlsSecret.cs | 2 -- .../src/System/Net/Quic/Internal/ThrowHelper.cs | 6 +++--- .../System.Net.Quic/src/System/Net/Quic/QuicListener.cs | 1 - 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicConfiguration.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicConfiguration.cs index 337884c61a5d3b..1c3b4872df1636 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicConfiguration.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicConfiguration.cs @@ -7,7 +7,6 @@ using System.Security.Cryptography.X509Certificates; using System.Threading; using Microsoft.Quic; -using static Microsoft.Quic.MsQuic; namespace System.Net.Quic; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicTlsSecret.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicTlsSecret.cs index 3a19656f63acb7..65f923bc836eb6 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicTlsSecret.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/MsQuicTlsSecret.cs @@ -2,11 +2,9 @@ // The .NET Foundation licenses this file to you under the MIT license. #if DEBUG -using System.Collections.Generic; using System.IO; using System.Runtime.InteropServices; using System.Text; -using System.Threading; using Microsoft.Quic; using static Microsoft.Quic.MsQuic; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs index 618985e4dc3a6c..b2f2a7886401cc 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs @@ -1,12 +1,12 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Security.Authentication; +using System.Diagnostics.CodeAnalysis; using System.Net.Security; using System.Net.Sockets; -using static Microsoft.Quic.MsQuic; -using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Security.Authentication; +using static Microsoft.Quic.MsQuic; namespace System.Net.Quic; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs index 30dd019274d183..d217e1f68bff1f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Diagnostics; using System.Net.Security; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; From e9655b7a1298b773cb056c84aea26f7d748681b5 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 11 Aug 2023 16:43:55 +0200 Subject: [PATCH 13/21] Fixed waiting for SEND_COMPLETE --- .../src/System/Net/Quic/QuicStream.cs | 62 ++++++++++++++----- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 2fa6ff26e9a0d9..c96d9647d77f0f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -109,6 +109,8 @@ public sealed partial class QuicStream } }; private MsQuicBuffers _sendBuffers = new MsQuicBuffers(); + private int _sendLocked; + private Exception? _sendException; private readonly long _defaultErrorCode; @@ -362,7 +364,7 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit // Register cancellation if the token can be cancelled and the task is not completed yet. CancellationTokenRegistration cancellationRegistration = default; - bool aborted = false; + bool canceled = false; try { if (cancellationToken.CanBeCanceled) @@ -372,8 +374,8 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit try { QuicStream stream = (QuicStream)obj!; + Volatile.Write(ref canceled, true); stream.Abort(QuicAbortDirection.Write, stream._defaultErrorCode); - aborted = true; } catch (ObjectDisposedException) { @@ -403,19 +405,36 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit return; } - unsafe + // We own the lock, abort might happen, but exception will get stored instead. + if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0 && !_sendTcs.IsCompleted) { - _sendBuffers.Initialize(buffer); - int status = MsQuicApi.Api.StreamSend( - _handle, - _sendBuffers.Buffers, - (uint)_sendBuffers.Count, - completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, - null); - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + unsafe { - _sendBuffers.Reset(); - _sendTcs.TrySetException(exception, final: true); + _sendBuffers.Initialize(buffer); + int status = MsQuicApi.Api.StreamSend( + _handle, + _sendBuffers.Buffers, + (uint)_sendBuffers.Count, + completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, + null); + // No SEND_COMPLETE expected + if (StatusFailed(status)) + { + // Release buffer and unlock + _sendBuffers.Reset(); + Volatile.Write(ref _sendLocked, 0); + + // There might be stored exception from when we held the lock + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + { + Interlocked.CompareExchange(ref _sendException, exception, null); + } + exception = Volatile.Read(ref _sendException); + if (exception is not null) + { + _sendTcs.TrySetException(exception, final: true); + } + } } } @@ -424,7 +443,7 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit finally { cancellationRegistration.Dispose(); - if (aborted) + if (Volatile.Read(ref canceled)) { cancellationToken.ThrowIfCancellationRequested(); } @@ -481,7 +500,13 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode) } if (abortDirection.HasFlag(QuicAbortDirection.Write)) { - _sendTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted), final: true); + var exception = ThrowHelper.GetOperationAbortedException(SR.net_quic_writing_aborted); + Interlocked.CompareExchange(ref _sendException, exception, null); + if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0) + { + _sendTcs.TrySetException(_sendException, final: true); + Volatile.Write(ref _sendLocked, 0); + } } } @@ -557,6 +582,13 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data) { _sendBuffers.Reset(); + Volatile.Write(ref _sendLocked, 0); + + Exception? exception = Volatile.Read(ref _sendException); + if (exception is not null) + { + _sendTcs.TrySetException(exception, final: true); + } if (data.Canceled == 0) { _sendTcs.TrySetResult(); From 2800015af26236ddbfb78c07f803a7fde7ce337d Mon Sep 17 00:00:00 2001 From: ManickaP Date: Sat, 12 Aug 2023 12:27:30 +0200 Subject: [PATCH 14/21] Hold back setting FinalTaskSource and overwrite result if no waiter is there --- .../Internal/ResettableValueTaskSource.cs | 146 +++++++++++++----- .../Net/Quic/Internal/ValueTaskSource.cs | 4 +- 2 files changed, 109 insertions(+), 41 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index 5100d47e3e86ba..21ceb534348dcc 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -26,22 +26,21 @@ private enum State } private State _state; + private bool _hasWaiter; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; private Action? _cancellationAction; private GCHandle _keepAlive; - private bool _finalContinuationRegistered; + private FinalTaskSource _finalTaskSource; - private readonly TaskCompletionSource _finalTaskSource; - - public ResettableValueTaskSource(bool runContinuationsAsynchronously = true) + public ResettableValueTaskSource() { _state = State.None; - _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = runContinuationsAsynchronously }; + _hasWaiter = false; + _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; _keepAlive = default; - _finalContinuationRegistered = false; - _finalTaskSource = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + _finalTaskSource = new FinalTaskSource(); } /// @@ -115,11 +114,12 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C _state = State.Awaiting; } - // None, Completed, Final: return the current task. + // None, Ready, Completed: return the current task. if (state == State.None || state == State.Ready || state == State.Completed) { + _hasWaiter = true; valueTask = new ValueTask(this, _valueTaskSource.Version); return true; } @@ -136,23 +136,9 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C /// The that will transition to a completed state with the last transition of this source. public Task GetFinalTask(object? keepAlive) { - if (_finalContinuationRegistered || keepAlive is null) - { - return _finalTaskSource.Task; - } - lock (this) { - if (!_finalContinuationRegistered) - { - GCHandle handle = GCHandle.Alloc(keepAlive); - _finalTaskSource.Task.ContinueWith(static (_, state) => - { - ((GCHandle)state!).Free(); - }, handle, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - _finalContinuationRegistered = true; - } - return _finalTaskSource.Task; + return _finalTaskSource.GetTask(keepAlive); } } @@ -173,6 +159,12 @@ private bool TryComplete(Exception? exception, bool final) return false; } + if (state == State.Ready && !_hasWaiter && final) + { + _valueTaskSource.Reset(); + state = State.None; + } + // If the _valueTaskSource has already been set, we don't want to lose the result by overwriting it. // So keep it as is and store the result in _finalTaskSource. if (state == State.None || @@ -198,7 +190,15 @@ private bool TryComplete(Exception? exception, bool final) } if (final) { - return _finalTaskSource.TrySetException(exception); + if (_finalTaskSource.TryComplete(exception)) + { + if (state != State.Ready) + { + _finalTaskSource.TrySignal(out _); + } + return true; + } + return false; } return state != State.Ready; } @@ -211,7 +211,15 @@ private bool TryComplete(Exception? exception, bool final) } if (final) { - return _finalTaskSource.TrySetResult(); + if (_finalTaskSource.TryComplete(exception)) + { + if (state != State.Ready) + { + _finalTaskSource.TrySignal(out _); + } + return true; + } + return false; } return state != State.Ready; } @@ -265,11 +273,9 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s void IValueTaskSource.GetResult(short token) { - bool successful = false; try { _valueTaskSource.GetResult(token); - successful = true; } finally { @@ -280,31 +286,93 @@ void IValueTaskSource.GetResult(short token) if (state == State.Ready) { _valueTaskSource.Reset(); + _hasWaiter = false; _state = State.None; // Propagate the _finalTaskSource result into _valueTaskSource if completed. - if (_finalTaskSource.Task.IsCompleted) + if (_finalTaskSource.TrySignal(out Exception? exception)) { _state = State.Completed; - if (_finalTaskSource.Task.IsCompletedSuccessfully) + + if (exception is not null) { - _valueTaskSource.SetResult(true); + _valueTaskSource.SetException(exception); } else { - // We know it's always going to be a single exception since we're the ones setting it. - _valueTaskSource.SetException(_finalTaskSource.Task.Exception?.InnerException!); - } - - // In case the _valueTaskSource was successful, we want the potential error from _finalTaskSource to surface immediately. - // In other words, if _valueTaskSource was set with success while final exception arrived, this will throw that exception right away. - if (successful) - { - _valueTaskSource.GetResult(_valueTaskSource.Version); + _valueTaskSource.SetResult(true); } } + else + { + _state = State.None; + } + } + } + } + } + + private struct FinalTaskSource + { + private TaskCompletionSource? _finalTaskSource; + private bool _isCompleted; + private Exception? _exception; + + public FinalTaskSource() + { + _finalTaskSource = null; + _isCompleted = false; + _exception = null; + } + + public Task GetTask(object? keepAlive) + { + if (_finalTaskSource is null) + { + _finalTaskSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (!TrySignal(out _)) + { + GCHandle handle = GCHandle.Alloc(keepAlive); + _finalTaskSource.Task.ContinueWith(static (_, state) => + { + ((GCHandle)state!).Free(); + }, handle, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } + return _finalTaskSource.Task; + } + + public bool TryComplete(Exception? exception = null) + { + if (_isCompleted) + { + return false; + } + + _exception = exception; + _isCompleted = true; + return true; + } + + public bool TrySignal(out Exception? exception) + { + if (!_isCompleted) + { + exception = default; + return false; + } + + if (_exception is not null) + { + _finalTaskSource?.SetException(_exception); + } + else + { + _finalTaskSource?.SetResult(); + } + + exception = _exception; + return true; } } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs index a6e40dbf7ea8ae..2acd2138a12374 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs @@ -27,10 +27,10 @@ private enum State : byte private CancellationTokenRegistration _cancellationRegistration; private GCHandle _keepAlive; - public ValueTaskSource(bool runContinuationsAsynchronously = true) + public ValueTaskSource() { _state = State.None; - _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = runContinuationsAsynchronously }; + _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; _keepAlive = default; } From 0c35a2bd5f92f523488588a5e6bfaf662926d64f Mon Sep 17 00:00:00 2001 From: ManickaP Date: Sat, 12 Aug 2023 16:09:02 +0200 Subject: [PATCH 15/21] Cancellation and completion --- .../Internal/ResettableValueTaskSource.cs | 146 +++++++++--------- .../src/System/Net/Quic/QuicStream.cs | 121 ++++++--------- 2 files changed, 118 insertions(+), 149 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index 21ceb534348dcc..aa5ebd1f759f3b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -29,6 +29,7 @@ private enum State private bool _hasWaiter; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; + private CancellationToken _cancelledToken; private Action? _cancellationAction; private GCHandle _keepAlive; private FinalTaskSource _finalTaskSource; @@ -39,6 +40,7 @@ public ResettableValueTaskSource() _hasWaiter = false; _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; + _cancelledToken = default; _keepAlive = default; _finalTaskSource = new FinalTaskSource(); } @@ -90,11 +92,11 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C _cancellationRegistration = cancellationToken.UnsafeRegister(static (obj, cancellationToken) => { (ResettableValueTaskSource thisRef, object? target) = ((ResettableValueTaskSource, object?))obj!; - // This will transition the state to Ready. - if (thisRef.TrySetException(new OperationCanceledException(cancellationToken))) + lock (thisRef) { - thisRef._cancellationAction?.Invoke(target); + thisRef._cancelledToken = cancellationToken; } + thisRef._cancellationAction?.Invoke(target); }, (this, keepAlive)); } } @@ -145,101 +147,98 @@ public Task GetFinalTask(object? keepAlive) private bool TryComplete(Exception? exception, bool final) { CancellationTokenRegistration cancellationRegistration = default; - try + lock (this) { - lock (this) + // Swap the cancellation registration so the one that's been registered gets disposed outside of the lock. + // If the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. + cancellationRegistration = _cancellationRegistration; + _cancellationRegistration = default; + } + // Dispose the cancellation if registered. + // Must be done outside of lock since Dispose will wait on pending cancellation callbacks which require taking the lock. + cancellationRegistration.Dispose(); + + lock (this) + { + try { - try + State state = _state; + + // Completed: nothing to do. + if (state == State.Completed) { - State state = _state; + return false; + } - // Completed: nothing to do. - if (state == State.Completed) - { - return false; - } + if (state == State.Ready && !_hasWaiter && final) + { + _valueTaskSource.Reset(); + state = State.None; + } - if (state == State.Ready && !_hasWaiter && final) - { - _valueTaskSource.Reset(); - state = State.None; - } + // If the _valueTaskSource has already been set, we don't want to lose the result by overwriting it. + // So keep it as is and store the result in _finalTaskSource. + if (state == State.None || + state == State.Awaiting) + { + _state = final ? State.Completed : State.Ready; + } - // If the _valueTaskSource has already been set, we don't want to lose the result by overwriting it. - // So keep it as is and store the result in _finalTaskSource. + // Unblock the current task source and in case of a final also the final task source. + if (exception is not null) + { + // Set up the exception stack trace for the caller. + exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; if (state == State.None || state == State.Awaiting) { - _state = final ? State.Completed : State.Ready; + _valueTaskSource.SetException(exception); } - - // Swap the cancellation registration so the one that's been registered gets eventually Disposed. - // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. - cancellationRegistration = _cancellationRegistration; - _cancellationRegistration = default; - - // Unblock the current task source and in case of a final also the final task source. - if (exception is not null) + if (final) { - // Set up the exception stack trace for the caller. - exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; - if (state == State.None || - state == State.Awaiting) + if (_finalTaskSource.TryComplete(exception)) { - _valueTaskSource.SetException(exception); - } - if (final) - { - if (_finalTaskSource.TryComplete(exception)) + if (state != State.Ready) { - if (state != State.Ready) - { - _finalTaskSource.TrySignal(out _); - } - return true; + _finalTaskSource.TrySignal(out _); } - return false; + return true; } - return state != State.Ready; + return false; } - else + return state != State.Ready; + } + else + { + if (state == State.None || + state == State.Awaiting) { - if (state == State.None || - state == State.Awaiting) - { - _valueTaskSource.SetResult(final); - } - if (final) + _valueTaskSource.SetResult(final); + } + if (final) + { + if (_finalTaskSource.TryComplete(exception)) { - if (_finalTaskSource.TryComplete(exception)) + if (state != State.Ready) { - if (state != State.Ready) - { - _finalTaskSource.TrySignal(out _); - } - return true; + _finalTaskSource.TrySignal(out _); } - return false; + return true; } - return state != State.Ready; + return false; } + return state != State.Ready; } - finally + } + finally + { + // Un-root the the kept alive object in all cases. + if (_keepAlive.IsAllocated) { - // Un-root the the kept alive object in all cases. - if (_keepAlive.IsAllocated) - { - _keepAlive.Free(); - } + _keepAlive.Free(); } } } - finally - { - // Dispose the cancellation if registered. - // Must be done outside of lock since Dispose will wait on pending cancellation callbacks which require taking the lock. - cancellationRegistration.Dispose(); - } } /// @@ -275,6 +274,7 @@ void IValueTaskSource.GetResult(short token) { try { + _cancelledToken.ThrowIfCancellationRequested(); _valueTaskSource.GetResult(token); } finally @@ -283,10 +283,12 @@ void IValueTaskSource.GetResult(short token) { State state = _state; + _hasWaiter = false; + _cancelledToken = default; + if (state == State.Ready) { _valueTaskSource.Reset(); - _hasWaiter = false; _state = State.None; // Propagate the _finalTaskSource result into _valueTaskSource if completed. diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index c96d9647d77f0f..f5526b3dd7faf3 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -76,6 +76,7 @@ public sealed partial class QuicStream if (target is QuicStream stream) { stream.Abort(QuicAbortDirection.Read, stream._defaultErrorCode); + stream._receiveTcs.TrySetResult(); } } catch (ObjectDisposedException) @@ -357,97 +358,64 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit } // Concurrent call, this one lost the race. - if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, default)) + if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken)) { throw new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "write")); } - // Register cancellation if the token can be cancelled and the task is not completed yet. - CancellationTokenRegistration cancellationRegistration = default; - bool canceled = false; - try + // No need to call anything since we already have a result, most likely an exception. + if (valueTask.IsCompleted) { - if (cancellationToken.CanBeCanceled) - { - cancellationRegistration = cancellationToken.UnsafeRegister((obj, cancellationToken) => - { - try - { - QuicStream stream = (QuicStream)obj!; - Volatile.Write(ref canceled, true); - stream.Abort(QuicAbortDirection.Write, stream._defaultErrorCode); - } - catch (ObjectDisposedException) - { - // We collided with a Dispose in another thread. This can happen - // when using CancellationTokenSource.CancelAfter. - // Ignore the exception - } - }, this); - } + await valueTask.ConfigureAwait(false); + return; + } - // No need to call anything since we already have a result, most likely an exception. - if (valueTask.IsCompleted) + // For an empty buffer complete immediately, close the writing side of the stream if necessary. + if (buffer.IsEmpty) + { + _sendTcs.TrySetResult(); + if (completeWrites) { - await valueTask.ConfigureAwait(false); - return; + CompleteWrites(); } + await valueTask.ConfigureAwait(false); + return; + } - // For an empty buffer complete immediately, close the writing side of the stream if necessary. - if (buffer.IsEmpty) + // We own the lock, abort might happen, but exception will get stored instead. + if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0 && !_sendTcs.IsCompleted) + { + unsafe { - _sendTcs.TrySetResult(); - if (completeWrites) + _sendBuffers.Initialize(buffer); + int status = MsQuicApi.Api.StreamSend( + _handle, + _sendBuffers.Buffers, + (uint)_sendBuffers.Count, + completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, + null); + // No SEND_COMPLETE expected, release buffer and unlock. + if (StatusFailed(status)) { - CompleteWrites(); - } - await valueTask.ConfigureAwait(false); - return; - } + _sendBuffers.Reset(); + Volatile.Write(ref _sendLocked, 0); - // We own the lock, abort might happen, but exception will get stored instead. - if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0 && !_sendTcs.IsCompleted) - { - unsafe - { - _sendBuffers.Initialize(buffer); - int status = MsQuicApi.Api.StreamSend( - _handle, - _sendBuffers.Buffers, - (uint)_sendBuffers.Count, - completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, - null); - // No SEND_COMPLETE expected - if (StatusFailed(status)) + // There might be stored exception from when we held the lock + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + { + Interlocked.CompareExchange(ref _sendException, exception, null); + } + exception = Volatile.Read(ref _sendException); + if (exception is not null) { - // Release buffer and unlock - _sendBuffers.Reset(); - Volatile.Write(ref _sendLocked, 0); - - // There might be stored exception from when we held the lock - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) - { - Interlocked.CompareExchange(ref _sendException, exception, null); - } - exception = Volatile.Read(ref _sendException); - if (exception is not null) - { - _sendTcs.TrySetException(exception, final: true); - } + _sendTcs.TrySetException(exception, final: true); } } - } - - await valueTask.ConfigureAwait(false); - } - finally - { - cancellationRegistration.Dispose(); - if (Volatile.Read(ref canceled)) - { - cancellationToken.ThrowIfCancellationRequested(); + // SEND_COMPLETE expected, buffer and lock will be released then. } } + + await valueTask.ConfigureAwait(false); } /// @@ -743,9 +711,8 @@ public override async ValueTask DisposeAsync() await valueTask.ConfigureAwait(false); } Debug.Assert(_startedTcs.IsCompleted); - // TODO: Revisit this with https://github.com/dotnet/runtime/issues/79818 and https://github.com/dotnet/runtime/issues/79911 - Debug.Assert(_receiveTcs.KeepAliveReleased); - Debug.Assert(_sendTcs.KeepAliveReleased); + Debug.Assert(_receiveTcs.IsCompleted); + Debug.Assert(_sendTcs.IsCompleted); _handle.Dispose(); // TODO: memory leak if not disposed From d3ba302de2a5ad0c8b6952c1d24cc47a37cf1afd Mon Sep 17 00:00:00 2001 From: ManickaP Date: Sat, 12 Aug 2023 17:50:49 +0200 Subject: [PATCH 16/21] Comments, fixed FinalTaskSource --- .../Internal/ResettableValueTaskSource.cs | 55 ++++++++++--------- .../src/System/Net/Quic/QuicStream.cs | 6 +- .../tests/FunctionalTests/QuicStreamTests.cs | 1 - 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index aa5ebd1f759f3b..92723ca39bdc5a 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -121,6 +121,7 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C state == State.Ready || state == State.Completed) { + // Remember that the value task with the current version is being given out. _hasWaiter = true; valueTask = new ValueTask(this, _valueTaskSource.Version); return true; @@ -146,16 +147,14 @@ public Task GetFinalTask(object? keepAlive) private bool TryComplete(Exception? exception, bool final) { + // Dispose the cancellation registration before completing the task, so that it cannot run after the awaiting method returned. + // Dispose must be done outside of lock since it will wait on pending cancellation callbacks that can hold the lock from another thread. CancellationTokenRegistration cancellationRegistration = default; lock (this) { - // Swap the cancellation registration so the one that's been registered gets disposed outside of the lock. - // If the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. cancellationRegistration = _cancellationRegistration; _cancellationRegistration = default; } - // Dispose the cancellation if registered. - // Must be done outside of lock since Dispose will wait on pending cancellation callbacks which require taking the lock. cancellationRegistration.Dispose(); lock (this) @@ -170,6 +169,8 @@ private bool TryComplete(Exception? exception, bool final) return false; } + // The task was non-finally completed without having anyone awaiting on it. + // In such case, discard the temporary result and replace it with this final completion. if (state == State.Ready && !_hasWaiter && final) { _valueTaskSource.Reset(); @@ -194,19 +195,6 @@ private bool TryComplete(Exception? exception, bool final) { _valueTaskSource.SetException(exception); } - if (final) - { - if (_finalTaskSource.TryComplete(exception)) - { - if (state != State.Ready) - { - _finalTaskSource.TrySignal(out _); - } - return true; - } - return false; - } - return state != State.Ready; } else { @@ -215,20 +203,22 @@ private bool TryComplete(Exception? exception, bool final) { _valueTaskSource.SetResult(final); } - if (final) + } + if (final) + { + if (_finalTaskSource.TryComplete(exception)) { - if (_finalTaskSource.TryComplete(exception)) + // Signal the final task only if we don't have another result in the value task source. + // In that case, the final task will be signalled after the value task result is retrieved. + if (state != State.Ready) { - if (state != State.Ready) - { - _finalTaskSource.TrySignal(out _); - } - return true; + _finalTaskSource.TrySignal(out _); } - return false; + return true; } - return state != State.Ready; + return false; } + return state != State.Ready; } finally { @@ -314,16 +304,22 @@ void IValueTaskSource.GetResult(short token) } } + /// + /// It remembers the result from and propagates it to only after is called. + /// Effectively allowing to separate setting of the result from task completion, which is necessary when the resettable portion of the value task source needs to consumed first. + /// private struct FinalTaskSource { private TaskCompletionSource? _finalTaskSource; private bool _isCompleted; + private bool _isSignaled; private Exception? _exception; public FinalTaskSource() { _finalTaskSource = null; _isCompleted = false; + _isSignaled = false; _exception = null; } @@ -332,7 +328,7 @@ public Task GetTask(object? keepAlive) if (_finalTaskSource is null) { _finalTaskSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - if (!TrySignal(out _)) + if (!_isCompleted) { GCHandle handle = GCHandle.Alloc(keepAlive); _finalTaskSource.Task.ContinueWith(static (_, state) => @@ -340,6 +336,10 @@ public Task GetTask(object? keepAlive) ((GCHandle)state!).Free(); }, handle, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } + if (_isSignaled) + { + TrySignal(out _); + } } return _finalTaskSource.Task; } @@ -374,6 +374,7 @@ public bool TrySignal(out Exception? exception) } exception = _exception; + _isSignaled = true; return true; } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index f5526b3dd7faf3..bc3783d5253d99 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -383,7 +383,7 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit } // We own the lock, abort might happen, but exception will get stored instead. - if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0 && !_sendTcs.IsCompleted) + if (Interlocked.CompareExchange(ref _sendLocked, 1, 0) == 0) { unsafe { @@ -400,7 +400,7 @@ public async ValueTask WriteAsync(ReadOnlyMemory buffer, bool completeWrit _sendBuffers.Reset(); Volatile.Write(ref _sendLocked, 0); - // There might be stored exception from when we held the lock + // There might be stored exception from when we held the lock. if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) { Interlocked.CompareExchange(ref _sendException, exception, null); @@ -549,9 +549,11 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data) } private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data) { + // Release buffer and unlock. _sendBuffers.Reset(); Volatile.Write(ref _sendLocked, 0); + // There might be stored exception from when we held the lock. Exception? exception = Volatile.Read(ref _sendException); if (exception is not null) { diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index dd449cf6cc98f0..cd3c1a2394f384 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -1232,7 +1232,6 @@ from bool2 in boolValues [MemberData(nameof(PayloadSizeAndTwoBools))] public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose) { - //using var logger = new TestUtilities.TestEventListener(Console.Out, "Private.InternalDiagnostics.System.Net.Quic"); using SemaphoreSlim serverSem = new SemaphoreSlim(0); using SemaphoreSlim clientSem = new SemaphoreSlim(0); From 5c33e2e0ea07110c79d7ee072ea2162ad07de44d Mon Sep 17 00:00:00 2001 From: ManickaP Date: Mon, 14 Aug 2023 10:39:27 +0200 Subject: [PATCH 17/21] Fix assert --- .../Net/Http/SocketsHttpHandler/Http3Connection.cs | 4 +++- .../Net/Quic/Internal/ResettableValueTaskSource.cs | 12 ------------ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 9744d02fc94f85..1cecc3180ab0d9 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -33,6 +33,7 @@ internal sealed class Http3Connection : HttpConnectionBase // Our control stream. private QuicStream? _clientControl; + private Task _sendSettingsTask; // Server-advertised SETTINGS_MAX_FIELD_SECTION_SIZE // https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-2.2.1 @@ -88,7 +89,7 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicCon } // Errors are observed via Abort(). - _ = SendSettingsAsync(); + _sendSettingsTask = SendSettingsAsync(); // This process is cleaned up when _connection is disposed, and errors are observed via Abort(). _ = AcceptStreamsAsync(); @@ -150,6 +151,7 @@ private void CheckForShutdown() if (_clientControl != null) { + await _sendSettingsTask.ConfigureAwait(false); await _clientControl.DisposeAsync().ConfigureAwait(false); _clientControl = null; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index 92723ca39bdc5a..c3135042b032b7 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -57,18 +57,6 @@ public ResettableValueTaskSource() /// public bool IsCompleted => (State)Volatile.Read(ref Unsafe.As(ref _state)) == State.Completed; - // TODO: Revisit this with https://github.com/dotnet/runtime/issues/79818 and https://github.com/dotnet/runtime/issues/79911 - public bool KeepAliveReleased - { - get - { - lock (this) - { - return !_keepAlive.IsAllocated; - } - } - } - /// /// Tries to get a value task representing this task source. If this task source is , it'll also transition it into state. /// It prevents concurrent operations from being invoked since it'll return false if the task source was already in state. From 77b76366963ace3fc5b8134887dbe5b3b51b065a Mon Sep 17 00:00:00 2001 From: ManickaP Date: Mon, 14 Aug 2023 16:00:04 +0200 Subject: [PATCH 18/21] Test reseting control stream made more resilient --- .../SocketsHttpHandler/Http3Connection.cs | 2 +- .../HttpClientHandlerTest.Http3.cs | 37 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 1cecc3180ab0d9..f16bb67669351e 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -488,7 +488,7 @@ private async Task ProcessServerStreamAsync(QuicStream stream) if (bytesRead == 0) { - // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-unidirectional-streams + // https://www.rfc-editor.org/rfc/rfc9114.html#name-unidirectional-streams // A sender can close or reset a unidirectional stream unless otherwise specified. A receiver MUST // tolerate unidirectional streams being closed or reset prior to the reception of the unidirectional // stream header. diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs index 92c4ab0d6097f6..373c25f3d258ed 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs @@ -1661,10 +1661,17 @@ public async Task ServerSendsTrailingHeaders_Success() } + public enum CloseOutboundControlStream + { + BogusData, + Dispose, + Abort, + } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool graceful) + [InlineData(CloseOutboundControlStream.BogusData)] + [InlineData(CloseOutboundControlStream.Dispose)] + [InlineData(CloseOutboundControlStream.Abort)] + public async Task ServerClosesOutboundControlStream_ClientClosesConnection(CloseOutboundControlStream closeType) { using Http3LoopbackServer server = CreateHttp3LoopbackServer(); @@ -1677,13 +1684,31 @@ public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool await using Http3LoopbackStream requestStream = await connection.AcceptRequestStreamAsync(); // abort the control stream - if (graceful) + if (closeType == CloseOutboundControlStream.BogusData) { await connection.OutboundControlStream.SendResponseBodyAsync(Array.Empty(), isFinal: true); } - else + else if (closeType == CloseOutboundControlStream.Dispose) { - connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR); + await connection.OutboundControlStream.DisposeAsync(); + } + else if (closeType == CloseOutboundControlStream.Abort) + { + int iterations = 5; + while (iterations-- > 0) + { + connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR); + // This sends RESET_FRAME which might cause complete discard of any data including stream type, leading to client ignoring the stream. + // Attempt to establish the control stream again then. + if (await semaphore.WaitAsync(100)) + { + // Client finished with the expected error. + return; + } + await connection.OutboundControlStream.DisposeAsync(); + await connection.EstablishControlStreamAsync(Array.Empty()); + await Task.Delay(100); + } } // wait for client task before tearing down the requestStream and connection From d6f50614018da68494bd97672713b59be89c8512 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Tue, 5 Sep 2023 17:13:28 +0200 Subject: [PATCH 19/21] Attempt to fix still running write while disposing the stream in case of a cancellation --- .../SocketsHttpHandler/Http3RequestStream.cs | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs index f5e037cf1d4067..d274bf2f9a5f07 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs @@ -31,6 +31,8 @@ internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisp private TaskCompletionSource? _expect100ContinueCompletionSource; // True indicates we should send content (e.g. received 100 Continue). private bool _disposed; private readonly CancellationTokenSource _requestBodyCancellationSource; + private Task? _sendRequestTask; // Set with SendContentAsync, must be awaited before QuicStream.DisposeAsync(); + private Task? _readResponseTask; // Set with ReadResponseAsync, must be awaited before QuicStream.DisposeAsync(); // Allocated when we receive a :status header. private HttpResponseMessage? _response; @@ -107,9 +109,25 @@ public async ValueTask DisposeAsync() { _disposed = true; AbortStream(); + // We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream. + await AwaitUnfinished(_sendRequestTask).ConfigureAwait(false); + await AwaitUnfinished(_readResponseTask).ConfigureAwait(false); await _stream.DisposeAsync().ConfigureAwait(false); DisposeSyncHelper(); } + + static async ValueTask AwaitUnfinished(Task? task) + { + if (task is not null && !task.IsCompleted) + { + try + { + await task.ConfigureAwait(false); + } + catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here. + { } + } + } } private void DisposeSyncHelper() @@ -158,40 +176,39 @@ public async Task SendAsync(CancellationToken cancellationT await FlushSendBufferAsync(endStream: _request.Content == null, _requestBodyCancellationSource.Token).ConfigureAwait(false); } - Task sendContentTask; if (_request.Content != null) { - sendContentTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token); + _sendRequestTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token); } else { - sendContentTask = Task.CompletedTask; + _sendRequestTask = Task.CompletedTask; } // In parallel, send content and read response. // Depending on Expect 100 Continue usage, one will depend on the other making progress. - Task readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token); + _readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token); bool sendContentObserved = false; // If we're not doing duplex, wait for content to finish sending here. // If we are doing duplex and have the unlikely event that it completes here, observe the result. // See Http2Connection.SendAsync for a full comment on this logic -- it is identical behavior. - if (sendContentTask.IsCompleted || + if (_sendRequestTask.IsCompleted || _request.Content?.AllowDuplex != true || - await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == sendContentTask || - sendContentTask.IsCompleted) + await Task.WhenAny(_sendRequestTask, _readResponseTask).ConfigureAwait(false) == _sendRequestTask || + _sendRequestTask.IsCompleted) { try { - await sendContentTask.ConfigureAwait(false); + await _sendRequestTask.ConfigureAwait(false); sendContentObserved = true; } catch { - // Exceptions will be bubbled up from sendContentTask here, - // which means the result of readResponseTask won't be observed directly: + // Exceptions will be bubbled up from _sendRequestTask here, + // which means the result of _readResponseTask won't be observed directly: // Do a background await to log any exceptions. - _connection.LogExceptions(readResponseTask); + _connection.LogExceptions(_readResponseTask); throw; } } @@ -199,11 +216,11 @@ await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == s { // Duplex is being used, so we can't wait for content to finish sending. // Do a background await to log any exceptions. - _connection.LogExceptions(sendContentTask); + _connection.LogExceptions(_sendRequestTask); } // Wait for the response headers to be read. - await readResponseTask.ConfigureAwait(false); + await _readResponseTask.ConfigureAwait(false); Debug.Assert(_response != null && _response.Content != null); // Set our content stream. From 8d22715de73bbd047be3ee8652a9b94eb90707e0 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Wed, 6 Sep 2023 11:15:01 +0200 Subject: [PATCH 20/21] Attempt to fix stress build --- .../tests/StressTests/HttpStress/Directory.Build.targets | 2 +- .../tests/StressTests/HttpStress/build-local.ps1 | 2 +- .../System.Net.Http/tests/StressTests/HttpStress/build-local.sh | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/Directory.Build.targets b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/Directory.Build.targets index e3ebd0de328758..db6e799e071df0 100644 --- a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/Directory.Build.targets +++ b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/Directory.Build.targets @@ -6,6 +6,6 @@ Define this here because the SDK resets it unconditionally in Microsoft.NETCoreSdk.BundledVersions.props. --> - 8.0 + 9.0 \ No newline at end of file diff --git a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.ps1 b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.ps1 index dbdd2e696c6344..19085c5af766cc 100644 --- a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.ps1 +++ b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.ps1 @@ -3,7 +3,7 @@ ## Usage: ## ./build-local.ps1 [StressConfiguration] [LibrariesConfiguration] -$Version="8.0" +$Version="9.0" $RepoRoot="$(git rev-parse --show-toplevel)" $DailyDotnetRoot= "./.dotnet-daily" diff --git a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.sh b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.sh index f5a0e2b784575c..44b5dbf21139fa 100755 --- a/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.sh +++ b/src/libraries/System.Net.Http/tests/StressTests/HttpStress/build-local.sh @@ -5,7 +5,7 @@ ## Usage: ## ./build-local.sh [StressConfiguration] [LibrariesConfiguration] -version=8.0 +version=9.0 repo_root=$(git rev-parse --show-toplevel) daily_dotnet_root=./.dotnet-daily From f6261e94aaea6518ad2d1d5d35d6758877183c51 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Tue, 12 Sep 2023 14:21:06 +0200 Subject: [PATCH 21/21] Sync Dispose in H3Stream waits for read and write as well --- .../SocketsHttpHandler/Http3RequestStream.cs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs index d274bf2f9a5f07..c18a43402fd930 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3RequestStream.cs @@ -90,9 +90,25 @@ public void Dispose() { _disposed = true; AbortStream(); + // We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream. + WaitUnfinished(_sendRequestTask); + WaitUnfinished(_readResponseTask); _stream.Dispose(); DisposeSyncHelper(); } + + static void WaitUnfinished(Task? task) + { + if (task is not null && !task.IsCompleted) + { + try + { + task.GetAwaiter().GetResult(); + } + catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here. + { } + } + } } private void RemoveFromConnectionIfDone()