Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not call into MsQuic inside a lock #67037

Merged
merged 1 commit into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public MsQuicConnection(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, MsQ

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What led you to even expand more the messages in asserts after Stephen's comment?
It'll become redundant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be more useful to have a message handy when something crashes, I thought we generally tend to include the message with the assert in this repo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exactly the other way around, just grep for Debug.Assert in the repo. And if it contains the message, I mostly see some additional info and not just the copy of the condition.

MsQuicApi.Api.SetCallbackHandlerDelegate(
_state.Handle,
s_connectionDelegate,
Expand Down Expand Up @@ -187,7 +187,7 @@ public MsQuicConnection(QuicClientConnectionOptions options)
_state.StateGCHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.ConnectionOpenDelegate(
MsQuicApi.Api.Registration,
s_connectionDelegate,
Expand Down Expand Up @@ -389,7 +389,7 @@ private static uint HandleEventStreamsAvailable(State state, ref ConnectionEvent

private static uint HandleEventPeerCertificateReceived(State state, ref ConnectionEvent connectionEvent)
{
SslPolicyErrors sslPolicyErrors = SslPolicyErrors.None;
SslPolicyErrors sslPolicyErrors = SslPolicyErrors.None;
X509Chain? chain = null;
X509Certificate2? certificate = null;
X509Certificate2Collection? additionalCertificates = null;
Expand Down Expand Up @@ -606,13 +606,13 @@ internal override QuicStreamProvider OpenBidirectionalStream()

internal override int GetRemoteAvailableUnidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
return MsQuicParameterHelpers.GetUShortParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.LOCAL_UNIDI_STREAM_COUNT);
}

internal override int GetRemoteAvailableBidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
return MsQuicParameterHelpers.GetUShortParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.LOCAL_BIDI_STREAM_COUNT);
}

Expand Down Expand Up @@ -645,7 +645,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
SOCKADDR_INET address = MsQuicAddressHelpers.IPEndPointToINet((IPEndPoint)_remoteEndPoint);
unsafe
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.SetParamDelegate(_state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.REMOTE_ADDRESS, (uint)sizeof(SOCKADDR_INET), (byte*)&address);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to connect to peer.");
}
Expand All @@ -668,7 +668,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
SOCKADDR_INET quicAddress = MsQuicAddressHelpers.IPEndPointToINet(new IPEndPoint(address, port));
unsafe
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.SetParamDelegate(_state.Handle, QUIC_PARAM_LEVEL.CONNECTION, (uint)QUIC_PARAM_CONN.REMOTE_ADDRESS, (uint)sizeof(SOCKADDR_INET), (byte*)&quicAddress);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to connect to peer.");
}
Expand All @@ -689,7 +689,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.ConnectionStartDelegate(
_state.Handle,
_configuration,
Expand Down Expand Up @@ -723,7 +723,7 @@ private ValueTask ShutdownAsync(

try
{
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ConnectionShutdownDelegate(
_state.Handle,
Flags,
Expand Down Expand Up @@ -851,7 +851,7 @@ private void Dispose(bool disposing)
if (_state.Handle != null && !_state.Handle.IsInvalid && !_state.Handle.IsClosed)
{
// Handle can be null if outbound constructor failed and we are called from finalizer.
Debug.Assert(!Monitor.IsEntered(_state));
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ConnectionShutdownDelegate(
_state.Handle,
QUIC_CONNECTION_SHUTDOWN_FLAGS.SILENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ internal MsQuicListener(QuicListenerOptions options)
_stateHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.ListenerOpenDelegate(
MsQuicApi.Api.Registration,
s_listenerDelegate,
Expand Down Expand Up @@ -185,6 +186,7 @@ private unsafe IPEndPoint Start(QuicListenerOptions options)
QuicBuffer[]? buffers = null;
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicAlpnHelper.Prepare(applicationProtocols, out handles, out buffers);
status = MsQuicApi.Api.ListenerStartDelegate(_state.Handle, (QuicBuffer*)Marshal.UnsafeAddrOfPinnedArrayElement(buffers, 0), (uint)applicationProtocols.Count, ref address);
}
Expand All @@ -200,6 +202,7 @@ private unsafe IPEndPoint Start(QuicListenerOptions options)

QuicExceptionHelpers.ThrowIfFailed(status, "ListenerStart failed.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
SOCKADDR_INET inetAddress = MsQuicParameterHelpers.GetINetParam(MsQuicApi.Api, _state.Handle, QUIC_PARAM_LEVEL.LISTENER, (uint)QUIC_PARAM_LISTENER.LOCAL_ADDRESS);
return MsQuicAddressHelpers.INetToIPEndPoint(ref inetAddress);
}
Expand All @@ -216,6 +219,7 @@ private void Stop()

if (_state.Handle != null)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.ListenerStopDelegate(_state.Handle);
}
}
Expand Down Expand Up @@ -276,6 +280,7 @@ private static unsafe uint NativeCallbackHandler(

connectionHandle = new SafeMsQuicConnectionHandle(evt->Data.NewConnection.Connection);

Debug.Assert(!Monitor.IsEntered(state), "!Monitor.IsEntered(state)");
uint status = MsQuicApi.Api.ConnectionSetConfigurationDelegate(connectionHandle, connectionConfiguration);
if (MsQuicStatusHelper.SuccessfulStatusCode(status))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
_state.StateGCHandle = GCHandle.Alloc(_state);
try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.SetCallbackHandlerDelegate(
_state.Handle,
s_streamDelegate,
Expand Down Expand Up @@ -164,6 +165,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

try
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamOpenDelegate(
connectionState.Handle,
flags,
Expand All @@ -173,6 +175,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
}
Expand Down Expand Up @@ -227,7 +230,7 @@ internal override int WriteTimeout
get
{
ThrowIfDisposed();
return _writeTimeout;
return _writeTimeout;
}
set
{
Expand Down Expand Up @@ -420,6 +423,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
long abortError;
bool preCanceled = false;

int bytesRead = -1;
bool reenableReceive = false;
lock (_state)
{
initialReadState = _state.ReadState;
Expand Down Expand Up @@ -482,22 +487,32 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
_state.ReadState = ReadState.None;

int taken = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);
ReceiveComplete(taken);
bytesRead = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);

if (taken != _state.ReceiveQuicBuffersTotalBytes)
if (bytesRead != _state.ReceiveQuicBuffersTotalBytes)
{
// Need to re-enable receives because MsQuic will pause them when we don't consume the entire buffer.
EnableReceive();
reenableReceive = true;
}
else if (_state.ReceiveIsFinal)
{
// This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
_state.ReadState = ReadState.ReadsCompleted;
}
}
}

// methods below need to be called outside of the lock
if (bytesRead > -1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be <= -1? We didn't have check like this before if I understand it correctly.

Copy link
Member Author

@rzikm rzikm Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be -1 if initialReadState != ReadState.IndividualReadComplete, I added it in order for EnableReceive and ReceiveComplete to be called (outside of the lock) if and only if they would've been called in the previous version (inside of the lock)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, it's its initial value set before the lock.

{
ReceiveComplete(bytesRead);

return new ValueTask<int>(taken);
if (reenableReceive)
{
EnableReceive();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're calling this outside of the lock, is there something else that ensures we're not racing with another thread to disable receives?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stephen's right. Just a hint here (I haven't thought this fully through), we're using the ReadState as a guard, we always change it only within the lock and than we use it to determine other actions that need to take place outside of the lock.
Possible races: ReadAsync may race with with msquic callback HandleEventRecv. And I don't remember if we have any guards to prevent parallel reads on the stream, I think we have an open issue for this somewhere...

cc @CarnaViire

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR: RECV event is the only thing disabling receives. It seems to me that it should be ok due to additional guard from msquic side (no new RECV event will come until ReceiveComplete+EnableReceive is called). But I don't like how fragile it looks 😢

Note that the change is only in a branch for "IndividualReadComplete" state. It means it happens AFTER some data already arrived in RECV event.

We do have a guard for parallel reads which looks exactly at ReadState. So it will throw if ReadAsync is called while state is PendingRead (no data available and there's already a waiting read).

By moving ReceiveComplete+EnableReceive out of the lock, we allow a time where state is already changed from "IndividualReadComplete" to "None", but ongoing "first" ReceiveAsync function is not exited yet and ReceiveComplete+EnableReceive are not called yet (but all data is already copied). So a second ReceiveAsync might enter. It would see state "None" which would mean data is not available. If it grabs the lock before ReceiveComplete+EnableReceive are called (i.e. RECV event is still not possible), it would change the state to PendingRead (waiting for data), store the destination buffer reference, register cancellation and return a task to wait. All the things "None" branch touches are not touched in the remainder of the first ReceiveAsync (between exiting from the lock and returning new value task).

The bottom line is I don't think there's any problem in ReadAsync vs HandleEventRecv race. But we might want to rethink guard against parallel reads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly, the only think that can go wrong with this change is parallel ReadAsync operations, which user code is not supposed to do anyway...

Should I add the guards against parallel operations in this PR, or should we do that separately? @ManickaP @stephentoub , thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say leave it as is for now. We have a separate issue for proper guards against parallel reads and writes #52627

}

return new ValueTask<int>(bytesRead);
}

// All success scenarios returned at this point. Failure scenarios below:
Expand All @@ -510,7 +525,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
ex = new InvalidOperationException("Only one read is supported at a time.");
break;
case ReadState.Aborted:
ex = preCanceled ? new OperationCanceledException(cancellationToken) :
ex = preCanceled ? new OperationCanceledException(cancellationToken) :
ThrowHelper.GetStreamAbortedException(abortError);
break;
case ReadState.ConnectionClosed:
Expand Down Expand Up @@ -609,6 +624,7 @@ internal override void AbortWrite(long errorCode)

private void StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamShutdownDelegate(_state.Handle, flags, errorCode);
QuicExceptionHelpers.ThrowIfFailed(status, "StreamShutdown failed.");
}
Expand Down Expand Up @@ -818,15 +834,17 @@ private void Dispose(bool disposing)
{
// Handle race condition when stream can be closed handling SHUTDOWN_COMPLETE.
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
} catch (ObjectDisposedException) { };
}
catch (ObjectDisposedException) { };
}

if (abortRead)
{
try
{
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE, 0xffffffff);
} catch (ObjectDisposedException) { };
}
catch (ObjectDisposedException) { };
}

if (completeRead)
Expand All @@ -845,6 +863,7 @@ private void Dispose(bool disposing)

private void EnableReceive()
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of repeating all this, it might be a little more readable to just call a method?

[Conditional("DEBUG")]
internal void AssertMonitorNotEntered(object obj)
{
     Debug.Assert(!Monitor.IsEntered(obj), "Monitor was unexpectedly held");
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using asserts like this all over the place, including S.N.Http. So I'm fine with this as-is, it's still single line. What I do think is that the message is unnecessary here and we rarely include something like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If we do #65965, it'll also be redundant in this case, as with or without the message you'd get the same assert.)

uint status = MsQuicApi.Api.StreamReceiveSetEnabledDelegate(_state.Handle, enabled: true);
QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
}
Expand Down Expand Up @@ -1289,6 +1308,7 @@ private unsafe ValueTask SendReadOnlyMemoryAsync(
_state.BufferArrays[0] = handle;
_state.SendBufferCount = 1;

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand Down Expand Up @@ -1352,6 +1372,7 @@ private unsafe ValueTask SendReadOnlySequenceAsync(
++count;
}

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand Down Expand Up @@ -1412,6 +1433,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
_state.BufferArrays[i] = handle;
}

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBuffers,
Expand All @@ -1434,6 +1456,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(

private void ReceiveComplete(int bufferLength)
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamReceiveCompleteDelegate(_state.Handle, (ulong)bufferLength);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not complete receive call.");
}
Expand Down