@@ -108,6 +108,11 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
108
108
/// </summary>
109
109
private int _disposed ;
110
110
111
+ /// <summary>
112
+ /// Completed when connection shutdown is initiated.
113
+ /// </summary>
114
+ private TaskCompletionSource _connectionCloseTcs = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
115
+
111
116
private readonly ValueTaskSource _connectedTcs = new ValueTaskSource ( ) ;
112
117
private readonly ResettableValueTaskSource _shutdownTcs = new ResettableValueTaskSource ( )
113
118
{
@@ -435,15 +440,14 @@ public async ValueTask<QuicStream> OpenOutboundStreamAsync(QuicStreamType type,
435
440
ObjectDisposedException . ThrowIf ( _disposed == 1 , this ) ;
436
441
437
442
// In case of an incoming race when the connection is closed by the peer just before we open the stream,
438
- // we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. To
439
- // distinguish this case, we throw ConnectionAborted without ApplicationErrorCode. In such a case, we
440
- // can expect the connection close exception to be already reported on the connection level (or very soon).
441
- bool connectionAbortedByPeer = ex is QuicException qe && qe . QuicError == QuicError . ConnectionAborted && qe . ApplicationErrorCode is null ;
443
+ // we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. We throw
444
+ // special exception and handle it here where we can determine the shutdown reason.
445
+ bool connectionAbortedByPeer = ThrowHelper . IsConnectionAbortedWhenStartingStreamException ( ex ) ;
442
446
443
447
// Propagate connection error if present.
444
- if ( _acceptQueue . Reader . Completion . IsFaulted || connectionAbortedByPeer )
448
+ if ( _connectionCloseTcs . Task . IsFaulted || connectionAbortedByPeer )
445
449
{
446
- await _acceptQueue . Reader . Completion . ConfigureAwait ( false ) ;
450
+ await _connectionCloseTcs . Task . ConfigureAwait ( false ) ;
447
451
}
448
452
throw ;
449
453
}
@@ -541,12 +545,15 @@ private unsafe int HandleEventShutdownInitiatedByTransport(ref SHUTDOWN_INITIATE
541
545
{
542
546
Exception exception = ExceptionDispatchInfo . SetCurrentStackTrace ( ThrowHelper . GetExceptionForMsQuicStatus ( data . Status , ( long ) data . ErrorCode ) ) ;
543
547
_connectedTcs . TrySetException ( exception ) ;
544
- CompleteAcceptQueue ( exception , false ) ;
548
+ _connectionCloseTcs . TrySetException ( exception ) ;
549
+ _acceptQueue . Writer . TryComplete ( exception ) ;
545
550
return QUIC_STATUS_SUCCESS ;
546
551
}
547
552
private unsafe int HandleEventShutdownInitiatedByPeer ( ref SHUTDOWN_INITIATED_BY_PEER_DATA data )
548
553
{
549
- CompleteAcceptQueue ( ExceptionDispatchInfo . SetCurrentStackTrace ( ThrowHelper . GetConnectionAbortedException ( ( long ) data . ErrorCode ) ) , false ) ;
554
+ Exception exception = ExceptionDispatchInfo . SetCurrentStackTrace ( ThrowHelper . GetConnectionAbortedException ( ( long ) data . ErrorCode ) ) ;
555
+ _connectionCloseTcs . TrySetException ( exception ) ;
556
+ _acceptQueue . Writer . TryComplete ( exception ) ;
550
557
return QUIC_STATUS_SUCCESS ;
551
558
}
552
559
private unsafe int HandleEventShutdownComplete ( )
@@ -555,7 +562,8 @@ private unsafe int HandleEventShutdownComplete()
555
562
_tlsSecret ? . WriteSecret ( ) ;
556
563
557
564
Exception exception = ExceptionDispatchInfo . SetCurrentStackTrace ( _disposed == 1 ? new ObjectDisposedException ( GetType ( ) . FullName ) : ThrowHelper . GetOperationAbortedException ( ) ) ;
558
- CompleteAcceptQueue ( exception , true ) ;
565
+ _connectionCloseTcs . TrySetException ( exception ) ;
566
+ _acceptQueue . Writer . TryComplete ( exception ) ;
559
567
_connectedTcs . TrySetException ( exception ) ;
560
568
_shutdownTokenSource . Cancel ( ) ;
561
569
_shutdownTcs . TrySetResult ( final : true ) ;
@@ -666,28 +674,6 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context,
666
674
}
667
675
}
668
676
669
- private void CompleteAcceptQueue ( Exception ? ex , bool drain )
670
- {
671
- _acceptQueue . Writer . TryComplete ( ex ) ;
672
-
673
- if ( drain )
674
- {
675
- // This should be only called after connection SHUTDOWN_COMPLETE has been indicated.
676
- // At that point, all streams have been already shut down internally and we need
677
- // only to close the handle via dispose, so DisposeAsync below should complete
678
- // synchronously (which is necessary for this method to be callable from MsQuic
679
- // event callback).
680
- while ( _acceptQueue . Reader . TryRead ( out QuicStream ? stream ) )
681
- {
682
- ValueTask task = stream . DisposeAsync ( ) ;
683
- Debug . Assert ( task . IsCompletedSuccessfully ) ;
684
- task . GetAwaiter ( ) . GetResult ( ) ;
685
- }
686
-
687
- Debug . Assert ( _acceptQueue . Reader . Completion . IsCompleted ) ;
688
- }
689
- }
690
-
691
677
/// <summary>
692
678
/// If not closed explicitly by <see cref="CloseAsync(long, CancellationToken)" />, closes the connection with the <see cref="QuicConnectionOptions.DefaultCloseErrorCode"/>.
693
679
/// And releases all resources associated with the connection.
@@ -741,6 +727,10 @@ public async ValueTask DisposeAsync()
741
727
}
742
728
743
729
// Flush the queue and dispose all remaining streams.
744
- CompleteAcceptQueue ( ExceptionDispatchInfo . SetCurrentStackTrace ( new ObjectDisposedException ( GetType ( ) . FullName ) ) , true ) ;
730
+ _acceptQueue . Writer . TryComplete ( ExceptionDispatchInfo . SetCurrentStackTrace ( new ObjectDisposedException ( GetType ( ) . FullName ) ) ) ;
731
+ while ( _acceptQueue . Reader . TryRead ( out QuicStream ? stream ) )
732
+ {
733
+ await stream . DisposeAsync ( ) . ConfigureAwait ( false ) ;
734
+ }
745
735
}
746
736
}
0 commit comments