Skip to content

Commit

Permalink
Do not call into MsQuic inside a lock
Browse files Browse the repository at this point in the history
  • Loading branch information
rzikm committed Mar 23, 2022
1 parent ac84ea6 commit 67724bb
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public MsQuicConnection(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, MsQ

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.SetCallbackHandlerDelegate(
_state.Handle,
s_connectionDelegate,
Expand Down Expand Up @@ -187,7 +187,7 @@ public MsQuicConnection(QuicClientConnectionOptions options)
_state.StateGCHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.ConnectionOpenDelegate(
MsQuicApi.Api.Registration,
s_connectionDelegate,
Expand Down Expand Up @@ -389,7 +389,7 @@ private static uint HandleEventStreamsAvailable(State state, ref ConnectionEvent

private static uint HandleEventPeerCertificateReceived(State state, ref ConnectionEvent connectionEvent)
{
SslPolicyErrors sslPolicyErrors = SslPolicyErrors.None;
SslPolicyErrors sslPolicyErrors = SslPolicyErrors.None;
X509Chain? chain = null;
X509Certificate2? certificate = null;
X509Certificate2Collection? additionalCertificates = null;
Expand Down Expand Up @@ -606,13 +606,13 @@ internal override QuicStreamProvider OpenBidirectionalStream()

internal override int GetRemoteAvailableUnidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
return MsQuicParameterHelpers.GetUShortParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.LOCAL_UNIDI_STREAM_COUNT);
}

internal override int GetRemoteAvailableBidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
return MsQuicParameterHelpers.GetUShortParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.LOCAL_BIDI_STREAM_COUNT);
}

Expand Down Expand Up @@ -645,7 +645,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
SOCKADDR_INET address = MsQuicAddressHelpers.IPEndPointToINet((IPEndPoint)_remoteEndPoint);
unsafe
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.SetParamDelegate(_state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.REMOTE_ADDRESS, (uint)sizeof(SOCKADDR_INET), (byte*)&address);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to connect to peer.");
}
Expand All @@ -668,7 +668,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
SOCKADDR_INET quicAddress = MsQuicAddressHelpers.IPEndPointToINet(new IPEndPoint(address, port));
unsafe
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.SetParamDelegate(_state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.REMOTE_ADDRESS, (uint)sizeof(SOCKADDR_INET), (byte*)&quicAddress);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to connect to peer.");
}
Expand All @@ -689,7 +689,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.ConnectionStartDelegate(
_state.Handle,
_configuration,
Expand Down Expand Up @@ -723,7 +723,7 @@ private ValueTask ShutdownAsync(

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ConnectionShutdownDelegate(
_state.Handle,
Flags,
Expand Down Expand Up @@ -851,7 +851,7 @@ private void Dispose(bool disposing)
if (_state.Handle != null && !_state.Handle.IsInvalid && !_state.Handle.IsClosed)
{
// Handle can be null if outbound constructor failed and we are called from finalizer.
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ConnectionShutdownDelegate(
_state.Handle,
QUIC_CONNECTION_SHUTDOWN_FLAGS.SILENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ internal MsQuicListener(QuicListenerOptions options)
_stateHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.ListenerOpenDelegate(
MsQuicApi.Api.Registration,
s_listenerDelegate,
Expand Down Expand Up @@ -185,6 +186,7 @@ private unsafe IPEndPoint Start(QuicListenerOptions options)
QuicBuffer[]? buffers = null;
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicAlpnHelper.Prepare(applicationProtocols, out handles, out buffers);
status = MsQuicApi.Api.ListenerStartDelegate(_state.Handle, (QuicBuffer*)Marshal.UnsafeAddrOfPinnedArrayElement(buffers, 0), (uint)applicationProtocols.Count, ref address);
}
Expand All @@ -200,6 +202,7 @@ private unsafe IPEndPoint Start(QuicListenerOptions options)

QuicExceptionHelpers.ThrowIfFailed(status, "ListenerStart failed.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
SOCKADDR_INET inetAddress = MsQuicParameterHelpers.GetINetParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.LISTENER, (uint)QUIC_PARAM_LISTENER.LOCAL_ADDRESS);
return MsQuicAddressHelpers.INetToIPEndPoint(ref inetAddress);
}
Expand All @@ -216,6 +219,7 @@ private void Stop()

if (_state.Handle != null)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ListenerStopDelegate(_state.Handle);
}
}
Expand Down Expand Up @@ -276,6 +280,7 @@ private static unsafe uint NativeCallbackHandler(

connectionHandle = new SafeMsQuicConnectionHandle(evt->Data.NewConnection.Connection);

Debug.Assert(!Monitor.IsEntered(state), "!Monitor.IsEntered(state)");
uint status = MsQuicApi.Api.ConnectionSetConfigurationDelegate(connectionHandle, connectionConfiguration);
if (MsQuicStatusHelper.SuccessfulStatusCode(status))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
_state.StateGCHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.SetCallbackHandlerDelegate(
_state.Handle,
s_streamDelegate,
Expand Down Expand Up @@ -164,6 +165,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamOpenDelegate(
connectionState.Handle,
flags,
Expand All @@ -173,6 +175,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
}
Expand Down Expand Up @@ -227,7 +230,7 @@ internal override int WriteTimeout
get
{
ThrowIfDisposed();
return _writeTimeout;
return _writeTimeout;
}
set
{
Expand Down Expand Up @@ -420,6 +423,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
long abortError;
bool preCanceled = false;

int bytesRead = -1;
bool reenableReceive = false;
lock (_state)
{
initialReadState = _state.ReadState;
Expand Down Expand Up @@ -482,22 +487,32 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
_state.ReadState = ReadState.None;

int taken = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);
ReceiveComplete(taken);
bytesRead = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);

if (taken != _state.ReceiveQuicBuffersTotalBytes)
if (bytesRead != _state.ReceiveQuicBuffersTotalBytes)
{
// Need to re-enable receives because MsQuic will pause them when we don't consume the entire buffer.
EnableReceive();
reenableReceive = true;
}
else if (_state.ReceiveIsFinal)
{
// This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
_state.ReadState = ReadState.ReadsCompleted;
}
}
}

// methods below need to be called outside of the lock
if (bytesRead > -1)
{
ReceiveComplete(bytesRead);

return new ValueTask<int>(taken);
if (reenableReceive)
{
EnableReceive();
}

return new ValueTask<int>(bytesRead);
}

// All success scenarios returned at this point. Failure scenarios below:
Expand All @@ -510,7 +525,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
ex = new InvalidOperationException("Only one read is supported at a time.");
break;
case ReadState.Aborted:
ex = preCanceled ? new OperationCanceledException(cancellationToken) :
ex = preCanceled ? new OperationCanceledException(cancellationToken) :
ThrowHelper.GetStreamAbortedException(abortError);
break;
case ReadState.ConnectionClosed:
Expand Down Expand Up @@ -609,6 +624,7 @@ internal override void AbortWrite(long errorCode)

private void StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamShutdownDelegate(_state.Handle, flags, errorCode);
QuicExceptionHelpers.ThrowIfFailed(status, "StreamShutdown failed.");
}
Expand Down Expand Up @@ -818,15 +834,17 @@ private void Dispose(bool disposing)
{
// Handle race condition when stream can be closed handling SHUTDOWN_COMPLETE.
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
} catch (ObjectDisposedException) { };
}
catch (ObjectDisposedException) { };
}

if (abortRead)
{
try
{
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE, 0xffffffff);
} catch (ObjectDisposedException) { };
}
catch (ObjectDisposedException) { };
}

if (completeRead)
Expand All @@ -845,6 +863,7 @@ private void Dispose(bool disposing)

private void EnableReceive()
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamReceiveSetEnabledDelegate(_state.Handle, enabled: true);
QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
}
Expand Down Expand Up @@ -1289,6 +1308,7 @@ private unsafe ValueTask SendReadOnlyMemoryAsync(
_state.BufferArrays[0] = handle;
_state.SendBufferCount = 1;

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand Down Expand Up @@ -1352,6 +1372,7 @@ private unsafe ValueTask SendReadOnlySequenceAsync(
++count;
}

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand Down Expand Up @@ -1412,6 +1433,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
_state.BufferArrays[i] = handle;
}

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand All @@ -1434,6 +1456,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(

private void ReceiveComplete(int bufferLength)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamReceiveCompleteDelegate(_state.Handle, (ulong)bufferLength);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not complete receive call.");
}
Expand Down

0 comments on commit 67724bb

Please sign in to comment.