Skip to content

Commit

Permalink
Removed connection state, updated mqsuic and fixed exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ManickaP committed Jul 21, 2022
1 parent ba827a9 commit 33fa788
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 85 deletions.
2 changes: 1 addition & 1 deletion eng/pipelines/coreclr/templates/helix-queues-setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
- (Debian.11.Amd64)Ubuntu.1804.amd64@mcr.microsoft.com/dotnet-buildtools/prereqs:debian-11-helix-amd64-20210304164428-5a7c380
- Ubuntu.1804.Amd64
- (Centos.8.Amd64)Ubuntu.1604.amd64@mcr.microsoft.com/dotnet-buildtools/prereqs:centos-8-helix-20201229003624-c1bf759
- (Fedora.34.Amd64)Ubuntu.1604.amd64@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220708122731-4f64125
- (Fedora.34.Amd64)Ubuntu.1604.amd64@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220716172051-4f64125
- RedHat.7.Amd64

# OSX arm64
Expand Down
4 changes: 2 additions & 2 deletions eng/pipelines/libraries/helix-queues-setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ jobs:
- (Centos.8.Amd64.Open)Ubuntu.2204.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:centos-8-helix-20201229003624-c1bf759
- RedHat.7.Amd64.Open
- SLES.15.Amd64.Open
- (Fedora.34.Amd64.Open)Ubuntu.1804.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220708122731-4f64125
- (Fedora.34.Amd64.Open)Ubuntu.1804.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220716172051-4f64125
- (Ubuntu.2204.Amd64.Open)Ubuntu.1804.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-22.04-helix-amd64-20220504035722-1b9461f
- (Debian.10.Amd64.Open)Ubuntu.2204.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:debian-10-helix-amd64-bfcd90a-20200121150006
- ${{ if or(ne(parameters.jobParameters.testScope, 'outerloop'), ne(parameters.jobParameters.runtimeFlavor, 'mono')) }}:
- ${{ if or(eq(parameters.jobParameters.isExtraPlatforms, true), eq(parameters.jobParameters.includeAllPlatforms, true)) }}:
- (Centos.8.Amd64.Open)Ubuntu.1604.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:centos-8-helix-20201229003624-c1bf759
- SLES.15.Amd64.Open
- (Fedora.34.Amd64.Open)ubuntu.1604.amd64.open@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220708122731-4f64125
- (Fedora.34.Amd64.Open)ubuntu.1604.amd64.open@mcr.microsoft.com/dotnet-buildtools/prereqs:fedora-34-helix-20220716172051-4f64125
- Ubuntu.2204.Amd64.Open
- (Debian.11.Amd64.Open)Ubuntu.1804.Amd64.Open@mcr.microsoft.com/dotnet-buildtools/prereqs:debian-11-helix-amd64-20210304164428-5a7c380
- (Mariner.1.0.Amd64.Open)ubuntu.1604.amd64.open@mcr.microsoft.com/dotnet-buildtools/prereqs:cbl-mariner-1.0-helix-20210528192219-92bf620
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,50 @@
using System.Security.Authentication;
using System.Net.Security;
using static Microsoft.Quic.MsQuic;
using System.Diagnostics.CodeAnalysis;

namespace System.Net.Quic;

internal static class ThrowHelper
{
internal static QuicException GetConnectionAbortedException(long errorCode)
{
return errorCode switch
{
-1 => GetOperationAbortedException(), // Shutdown initiated by us.
long err => new QuicException(QuicError.ConnectionAborted, err, SR.Format(SR.net_quic_connectionaborted, err)) // Shutdown initiated by peer.
};
return new QuicException(QuicError.ConnectionAborted, errorCode, SR.Format(SR.net_quic_connectionaborted, errorCode));
}

internal static QuicException GetStreamAbortedException(long errorCode)
{
return errorCode switch
{
-1 => GetOperationAbortedException(), // Shutdown initiated by us.
long err => new QuicException(QuicError.StreamAborted, err, SR.Format(SR.net_quic_streamaborted, err)) // Shutdown initiated by peer.
};
return new QuicException(QuicError.StreamAborted, errorCode, SR.Format(SR.net_quic_streamaborted, errorCode));
}

internal static QuicException GetOperationAbortedException(string? message = null)
{
return new QuicException(QuicError.OperationAborted, null, message ?? SR.net_quic_operationaborted);
}

internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWhen(true)] out Exception? exception)
{
if (status == QUIC_STATUS_ABORTED)
{
// If status == QUIC_STATUS_ABORTED, we will receive an event later, which will complete the task source.
exception = null;
return false;
}
else if (status == QUIC_STATUS_INVALID_STATE)
{
// If status == QUIC_STATUS_INVALID_STATE, we have closed the connection.
exception = ThrowHelper.GetOperationAbortedException();
return true;
}
else if (StatusFailed(status))
{
exception = ThrowHelper.GetExceptionForMsQuicStatus(status);
return true;
}
exception = null;
return false;
}

internal static Exception GetExceptionForMsQuicStatus(int status, string? message = null)
{
Exception ex = GetExceptionInternal(status, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,33 @@ internal byte ServerTrafficSecret0
}
}

internal partial struct QUIC_STREAM_STATISTICS
{
[NativeTypeName("uint64_t")]
internal ulong ConnBlockedBySchedulingUs;

[NativeTypeName("uint64_t")]
internal ulong ConnBlockedByPacingUs;

[NativeTypeName("uint64_t")]
internal ulong ConnBlockedByAmplificationProtUs;

[NativeTypeName("uint64_t")]
internal ulong ConnBlockedByCongestionControlUs;

[NativeTypeName("uint64_t")]
internal ulong ConnBlockedByFlowControlUs;

[NativeTypeName("uint64_t")]
internal ulong StreamBlockedByIdFlowControlUs;

[NativeTypeName("uint64_t")]
internal ulong StreamBlockedByFlowControlUs;

[NativeTypeName("uint64_t")]
internal ulong StreamBlockedByAppUs;
}

internal unsafe partial struct QUIC_SCHANNEL_CREDENTIAL_ATTRIBUTE_W
{
[NativeTypeName("unsigned long")]
Expand Down Expand Up @@ -2411,7 +2438,7 @@ internal byte AppCloseInProgress
}

[NativeTypeName("BOOLEAN : 1")]
internal byte ConnectionShutdownByPeer
internal byte ConnectionShutdownByApp
{
get
{
Expand All @@ -2424,17 +2451,31 @@ internal byte ConnectionShutdownByPeer
}
}

[NativeTypeName("BOOLEAN : 6")]
[NativeTypeName("BOOLEAN : 1")]
internal byte ConnectionClosedRemotely
{
get
{
return (byte)((_bitfield >> 2) & 0x1u);
}

set
{
_bitfield = (byte)((_bitfield & ~(0x1u << 2)) | ((value & 0x1u) << 2));
}
}

[NativeTypeName("BOOLEAN : 5")]
internal byte RESERVED
{
get
{
return (byte)((_bitfield >> 2) & 0x3Fu);
return (byte)((_bitfield >> 3) & 0x1Fu);
}

set
{
_bitfield = (byte)((_bitfield & ~(0x3Fu << 2)) | ((value & 0x3Fu) << 2));
_bitfield = (byte)((_bitfield & ~(0x1Fu << 3)) | ((value & 0x1Fu) << 3));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ public static async ValueTask<QuicConnection> ConnectAsync(QuicClientConnectionO
/// </summary>
private long _defaultCloseErrorCode;

// TODO: remove once/if https://github.com/microsoft/msquic/pull/2883 is merged
internal sealed class State
{
public long AbortErrorCode = -1;
}
private State _state = new State();

/// <summary>
/// Set when CONNECTED is received or inside the constructor for an inbound connection from NEW_CONNECTION data.
/// </summary>
Expand Down Expand Up @@ -364,7 +357,7 @@ public async ValueTask<QuicStream> OpenOutboundStreamAsync(QuicStreamType type,
QuicStream? stream = null;
try
{
stream = new QuicStream(_state, _handle, type, _defaultStreamErrorCode);
stream = new QuicStream(_handle, type, _defaultStreamErrorCode);
await stream.StartAsync(cancellationToken).ConfigureAwait(false);
}
catch
Expand Down Expand Up @@ -460,15 +453,14 @@ private unsafe int HandleEventConnected(ref CONNECTED_DATA data)
}
private unsafe int HandleEventShutdownInitiatedByTransport(ref SHUTDOWN_INITIATED_BY_TRANSPORT_DATA data)
{
_state.AbortErrorCode = 0;
// TODO: we should propagate transport error code.
Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetExceptionForMsQuicStatus(data.Status));
_connectedTcs.TrySetException(exception);
_acceptQueue.Writer.TryComplete(exception);
return QUIC_STATUS_SUCCESS;
}
private unsafe int HandleEventShutdownInitiatedByPeer(ref SHUTDOWN_INITIATED_BY_PEER_DATA data)
{
_state.AbortErrorCode = (long)data.ErrorCode;
_acceptQueue.Writer.TryComplete(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode)));
return QUIC_STATUS_SUCCESS;
}
Expand All @@ -490,7 +482,7 @@ private unsafe int HandleEventPeerAddressChanged(ref PEER_ADDRESS_CHANGED_DATA d
}
private unsafe int HandleEventPeerStreamStarted(ref PEER_STREAM_STARTED_DATA data)
{
QuicStream stream = new QuicStream(_state, _handle, data.Stream, data.Flags, _defaultStreamErrorCode);
QuicStream stream = new QuicStream(_handle, data.Stream, data.Flags, _defaultStreamErrorCode);
if (!_acceptQueue.Writer.TryWrite(stream))
{
if (NetEventSource.Log.IsEnabled())
Expand Down
61 changes: 24 additions & 37 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ public sealed partial class QuicStream
private bool _canRead;
private bool _canWrite;

// TODO: remove once/if https://github.com/microsoft/msquic/pull/2883 is merged
private readonly QuicConnection.State _connectionState;

private long _id = -1;
private QuicStreamType _type;

Expand Down Expand Up @@ -141,13 +138,11 @@ public sealed partial class QuicStream
/// <summary>
/// Initializes a new instance of an outbound <see cref="QuicStream" />.
/// </summary>
/// <param name="connectionState">Connection state</param>
/// <param name="connectionHandle"><see cref="QuicConnection"/> safe handle, used to increment/decrement reference count with each associated stream.</param>
/// <param name="type">The type of the stream to open.</param>
/// <param name="defaultErrorCode">Error code used when the stream needs to abort read or write side of the stream internally.</param>
internal unsafe QuicStream(QuicConnection.State connectionState, MsQuicContextSafeHandle connectionHandle, QuicStreamType type, long defaultErrorCode)
internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QuicStreamType type, long defaultErrorCode)
{
_connectionState = connectionState;
GCHandle context = GCHandle.Alloc(this, GCHandleType.Weak);
try
{
Expand Down Expand Up @@ -181,14 +176,12 @@ internal unsafe QuicStream(QuicConnection.State connectionState, MsQuicContextSa
/// <summary>
/// Initializes a new instance of an inbound <see cref="QuicStream" />.
/// </summary>
/// <param name="connectionState">Connection state</param>
/// <param name="connectionHandle"><see cref="QuicConnection"/> safe handle, used to increment/decrement reference count with each associated stream.</param>
/// <param name="handle">Native handle.</param>
/// <param name="flags">Related data from the PEER_STREAM_STARTED connection event.</param>
/// <param name="defaultErrorCode">Error code used when the stream needs to abort read or write side of the stream internally.</param>
internal unsafe QuicStream(QuicConnection.State connectionState, MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE* handle, QUIC_STREAM_OPEN_FLAGS flags, long defaultErrorCode)
internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE* handle, QUIC_STREAM_OPEN_FLAGS flags, long defaultErrorCode)
{
_connectionState = connectionState;
GCHandle context = GCHandle.Alloc(this, GCHandleType.Weak);
try
{
Expand Down Expand Up @@ -236,10 +229,9 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default)
int status = MsQuicApi.Api.ApiTable->StreamStart(
_handle.QuicHandle,
QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT);
if (StatusFailed(status))
if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception))
{
// TODO: aborted and the exception type
_startedTcs.TrySetException(ThrowHelper.GetExceptionForMsQuicStatus(status));
_startedTcs.TrySetException(exception);
}
}
}
Expand Down Expand Up @@ -374,32 +366,21 @@ public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool completeWrites, Ca
return valueTask;
}

try
_sendBuffers.Initialize(buffer);
unsafe
{
_sendBuffers.Initialize(buffer);
unsafe
int status = MsQuicApi.Api.ApiTable->StreamSend(
_handle.QuicHandle,
_sendBuffers.Buffers,
(uint)_sendBuffers.Count,
completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE,
null);
if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception))
{
int status = MsQuicApi.Api.ApiTable->StreamSend(
_handle.QuicHandle,
_sendBuffers.Buffers,
(uint)_sendBuffers.Count,
completeWrites ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE,
null);
if (status == QUIC_STATUS_ABORTED)
{
// If status == QUIC_STATUS_ABORTED, we either received PEER_RECEIVE_ABORTED or will receive SHUTDOWN_COMPLETE(ConnectionClose) later, all of which completes the _sendTcs.
_sendBuffers.Reset();
return valueTask;
}
ThrowHelper.ThrowIfMsQuicError(status, "StreamSend failed");
_sendBuffers.Reset();
_sendTcs.TrySetException(exception, final: true);
}
}
catch (Exception ex)
{
_sendTcs.TrySetException(ex, final: true);
_sendBuffers.Reset();
throw;
}

return valueTask;
}
Expand Down Expand Up @@ -489,8 +470,10 @@ private unsafe int HandleEventStartComplete(ref START_COMPLETE data)
}
else
{
_startedTcs.TrySetException(ThrowHelper.GetExceptionForMsQuicStatus(data.Status));
// TODO: aborted and exception type
if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(data.Status, out Exception? exception))
{
_startedTcs.TrySetException(exception);
}
}

return QUIC_STATUS_SUCCESS;
Expand Down Expand Up @@ -553,7 +536,11 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE data)
{
if (data.ConnectionShutdown != 0)
{
Exception exception = ThrowHelper.GetConnectionAbortedException(_connectionState.AbortErrorCode);
// If it's local shutdown by App, i.e.: this side called QuicConnection.CloseAsync, throw OperationAbortedException.
Exception exception = data.ConnectionShutdownByApp != 0 && data.ConnectionClosedRemotely == 0 ?
ThrowHelper.GetOperationAbortedException() :
// TODO: this will contain 0 for transport shutdown, we should propagate transport error code.
ThrowHelper.GetConnectionAbortedException(data.ConnectionShutdownByApp != 0 ? (long)data.ConnectionErrorCode : 0);
_startedTcs.TrySetException(exception);
_receiveTcs.TrySetException(exception, final: true);
_sendTcs.TrySetException(exception, final: true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationTok
[InlineData(true, true)] // the code path for uni/bidirectional streams differs only in a flag passed to MsQuic, so there is no need to test all possible combinations.
public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort)
{
const int expectedErrorCode = 789654;

ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional
? connection.OpenOutboundStreamAsync(QuicStreamType.Unidirectional, token)
: connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional, token);
Expand All @@ -595,17 +597,14 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationTok

if (localAbort)
{
await clientConnection.CloseAsync(0);
// TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads
// (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue)
// To be revisited once we standartize on exceptions.
// [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")]
await Assert.ThrowsAsync<QuicException>(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
await clientConnection.CloseAsync(expectedErrorCode);
await AssertThrowsQuicExceptionAsync(QuicError.OperationAborted, () => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
}
else
{
await serverConnection.CloseAsync(0);
await AssertThrowsQuicExceptionAsync(QuicError.ConnectionAborted, () => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
await serverConnection.CloseAsync(expectedErrorCode);
QuicException ex = await AssertThrowsQuicExceptionAsync(QuicError.ConnectionAborted, () => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}

await clientConnection.DisposeAsync();
Expand All @@ -625,7 +624,8 @@ public async Task SetListenerTimeoutWorksWithSmallTimeout()
};

(QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions);
await AssertThrowsQuicExceptionAsync(QuicError.ConnectionAborted, async () => await serverConnection.AcceptInboundStreamAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(100)));
await AssertThrowsQuicExceptionAsync(QuicError.ConnectionIdle, async () => await serverConnection.AcceptInboundStreamAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(100)));

await serverConnection.DisposeAsync();
await clientConnection.DisposeAsync();
}
Expand Down
Loading

0 comments on commit 33fa788

Please sign in to comment.