Skip to content

Commit

Permalink
Merging of TdsParserStateObject.ReadAsyncCallback (note: ports parts of
Browse files Browse the repository at this point in the history
dotnet#378 and dotnet#528 to netfx).
  • Loading branch information
panoskj committed Nov 22, 2022
1 parent 32b2b2f commit 81f549e
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ internal abstract partial class TdsParserStateObject
{
private static bool UseManagedSNI => TdsParserStateObjectFactory.UseManagedSNI;

private static readonly ContextCallback s_readAsyncCallbackComplete = ReadAsyncCallbackComplete;

// Timeout variables
private readonly WeakReference _cancellationOwner = new WeakReference(null);

Expand Down Expand Up @@ -394,157 +392,8 @@ private void SetBufferSecureStrings()
public void ReadAsyncCallback(PacketHandle packet, uint error) =>
ReadAsyncCallback(IntPtr.Zero, packet, error);

public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)
{
// Key never used.
// Note - it's possible that when native calls managed that an asynchronous exception
// could occur in the native->managed transition, which would
// have two impacts:
// 1) user event not called
// 2) DecrementPendingCallbacks not called, which would mean this object would be leaked due
// to the outstanding GCRoot until AppDomain.Unload.
// We live with the above for the time being due to the constraints of the current
// reliability infrastructure provided by the CLR.
SqlClientEventSource.Log.TryTraceEvent("TdsParserStateObject.ReadAsyncCallback | Info | State Object Id {0}, received error {1} on idle connection", _objectID, (int)error);

TaskCompletionSource<object> source = _networkPacketTaskSource;
#if DEBUG
if ((s_forcePendingReadsToWaitForUser) && (_realNetworkPacketTaskSource != null))
{
source = _realNetworkPacketTaskSource;
}
#endif

// The mars physical connection can get a callback
// with a packet but no result after the connection is closed.
if (source == null && _parser._pMarsPhysicalConObj == this)
{
return;
}

bool processFinallyBlock = true;
try
{
Debug.Assert(CheckPacket(packet, source) && source != null, "AsyncResult null on callback");

if (_parser.MARSOn)
{
// Only take reset lock on MARS and Async.
CheckSetResetConnectionState(error, CallbackType.Read);
}

ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);

// The timer thread may be unreliable under high contention scenarios. It cannot be
// assumed that the timeout has happened on the timer thread callback. Check the timeout
// synchrnously and then call OnTimeoutSync to force an atomic change of state.
if (TimeoutHasExpired)
{
OnTimeoutSync(true);
}

// try to change to the stopped state but only do so if currently in the running state
// and use cmpexch so that all changes out of the running state are atomic
int previousState = Interlocked.CompareExchange(ref _timeoutState, TimeoutState.Stopped, TimeoutState.Running);

// if the state is anything other than running then this query has reached an end so
// set the correlation _timeoutIdentityValue to 0 to prevent late callbacks executing
if (_timeoutState != TimeoutState.Running)
{
_timeoutIdentityValue = 0;
}

ProcessSniPacket(packet, error);
}
catch (Exception e)
{
processFinallyBlock = ADP.IsCatchableExceptionType(e);
throw;
}
finally
{
// pendingCallbacks may be 2 after decrementing, this indicates that a fatal timeout is occurring, and therefore we shouldn't complete the task
int pendingCallbacks = DecrementPendingCallbacks(false); // may dispose of GC handle.
if ((processFinallyBlock) && (source != null) && (pendingCallbacks < 2))
{
if (error == 0)
{
if (_executionContext != null)
{
ExecutionContext.Run(_executionContext, s_readAsyncCallbackComplete, source);
}
else
{
source.TrySetResult(null);
}
}
else
{
if (_executionContext != null)
{
ExecutionContext.Run(_executionContext, state => ReadAsyncCallbackCaptureException((TaskCompletionSource<object>)state), source);
}
else
{
ReadAsyncCallbackCaptureException(source);
}
}
}

AssertValidState();
}
}

private static void ReadAsyncCallbackComplete(object state)
{
TaskCompletionSource<object> source = (TaskCompletionSource<object>)state;
source.TrySetResult(null);
}

protected abstract bool CheckPacket(PacketHandle packet, TaskCompletionSource<object> source);

private void ReadAsyncCallbackCaptureException(TaskCompletionSource<object> source)
{
bool captureSuccess = false;
try
{
if (_hasErrorOrWarning)
{
// Do the close on another thread, since we don't want to block the callback thread
ThrowExceptionAndWarning(asyncClose: true);
}
else if ((_parser.State == TdsParserState.Closed) || (_parser.State == TdsParserState.Broken))
{
// Connection was closed by another thread before we parsed the packet, so no error was added to the collection
throw ADP.ClosedConnectionError();
}
}
catch (Exception ex)
{
if (source.TrySetException(ex))
{
// There was an exception, and it was successfully stored in the task
captureSuccess = true;
}
}

if (!captureSuccess)
{
// Either there was no exception, or the task was already completed
// This is unusual, but possible if a fatal timeout occurred on another thread (which should mean that the connection is now broken)
Debug.Assert(_parser.State == TdsParserState.Broken || _parser.State == TdsParserState.Closed || _parser.Connection.IsConnectionDoomed, "Failed to capture exception while the connection was still healthy");

// The safest thing to do is to ensure that the connection is broken and attempt to cancel the task
// This must be done from another thread to not block the callback thread
Task.Factory.StartNew(() =>
{
_parser.State = TdsParserState.Broken;
_parser.Connection.BreakConnection();
source.TrySetCanceled();
});
}
}

public void WriteAsyncCallback(PacketHandle packet, uint sniError) =>
WriteAsyncCallback(IntPtr.Zero, packet, sniError);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ private IntPtr ReadSyncOverAsync(int timeout, out uint error)

private bool IsValidPacket(IntPtr packet) => packet != IntPtr.Zero;

private bool CheckPacket(IntPtr packet, TaskCompletionSource<object> source) => IntPtr.Zero == packet || IntPtr.Zero != packet && source != null;

private void ReleasePacket(IntPtr packet) => SNINativeMethodWrapper.SNIPacketRelease(packet);

private IntPtr ReadAsync(SessionHandle handle, out uint error)
Expand All @@ -484,146 +486,6 @@ private uint SNIPacketGetData(IntPtr packet, byte[] _inBuff, ref uint dataSize)
return SNINativeMethodWrapper.SNIPacketGetData(packet, _inBuff, ref dataSize);
}

public void ReadAsyncCallback(IntPtr key, IntPtr packet, uint error)
{ // Key never used.
// Note - it's possible that when native calls managed that an asynchronous exception
// could occur in the native->managed transition, which would
// have two impacts:
// 1) user event not called
// 2) DecrementPendingCallbacks not called, which would mean this object would be leaked due
// to the outstanding GCRoot until AppDomain.Unload.
// We live with the above for the time being due to the constraints of the current
// reliability infrastructure provided by the CLR.

TaskCompletionSource<object> source = _networkPacketTaskSource;
#if DEBUG
if ((s_forcePendingReadsToWaitForUser) && (_realNetworkPacketTaskSource != null))
{
source = _realNetworkPacketTaskSource;
}
#endif

// The mars physical connection can get a callback
// with a packet but no result after the connection is closed.
if (source == null && _parser._pMarsPhysicalConObj == this)
{
return;
}

RuntimeHelpers.PrepareConstrainedRegions();
bool processFinallyBlock = true;
try
{
Debug.Assert(IntPtr.Zero == packet || IntPtr.Zero != packet && source != null, "AsyncResult null on callback");
if (_parser.MARSOn)
{ // Only take reset lock on MARS and Async.
CheckSetResetConnectionState(error, CallbackType.Read);
}

ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);

// The timer thread may be unreliable under high contention scenarios. It cannot be
// assumed that the timeout has happened on the timer thread callback. Check the timeout
// synchrnously and then call OnTimeoutSync to force an atomic change of state.
if (TimeoutHasExpired)
{
OnTimeoutSync(asyncClose: true);
}

// try to change to the stopped state but only do so if currently in the running state
// and use cmpexch so that all changes out of the running state are atomic
int previousState = Interlocked.CompareExchange(ref _timeoutState, TimeoutState.Stopped, TimeoutState.Running);

// if the state is anything other than running then this query has reached an end so
// set the correlation _timeoutIdentityValue to 0 to prevent late callbacks executing
if (_timeoutState != TimeoutState.Running)
{
_timeoutIdentityValue = 0;
}

ProcessSniPacket(packet, error);
}
catch (Exception e)
{
processFinallyBlock = ADP.IsCatchableExceptionType(e);
throw;
}
finally
{
// pendingCallbacks may be 2 after decrementing, this indicates that a fatal timeout is occuring, and therefore we shouldn't complete the task
int pendingCallbacks = DecrementPendingCallbacks(false); // may dispose of GC handle.
if ((processFinallyBlock) && (source != null) && (pendingCallbacks < 2))
{
if (error == 0)
{
if (_executionContext != null)
{
ExecutionContext.Run(_executionContext, (state) => source.TrySetResult(null), null);
}
else
{
source.TrySetResult(null);
}
}
else
{
if (_executionContext != null)
{
ExecutionContext.Run(_executionContext, (state) => ReadAsyncCallbackCaptureException(source), null);
}
else
{
ReadAsyncCallbackCaptureException(source);
}
}
}

AssertValidState();
}
}

private void ReadAsyncCallbackCaptureException(TaskCompletionSource<object> source)
{
bool captureSuccess = false;
try
{
if (_hasErrorOrWarning)
{
// Do the close on another thread, since we don't want to block the callback thread
ThrowExceptionAndWarning(asyncClose: true);
}
else if ((_parser.State == TdsParserState.Closed) || (_parser.State == TdsParserState.Broken))
{
// Connection was closed by another thread before we parsed the packet, so no error was added to the collection
throw ADP.ClosedConnectionError();
}
}
catch (Exception ex)
{
if (source.TrySetException(ex))
{
// There was an exception, and it was successfully stored in the task
captureSuccess = true;
}
}

if (!captureSuccess)
{
// Either there was no exception, or the task was already completed
// This is unusual, but possible if a fatal timeout occurred on another thread (which should mean that the connection is now broken)
Debug.Assert(_parser.State == TdsParserState.Broken || _parser.State == TdsParserState.Closed || _parser.Connection.IsConnectionDoomed, "Failed to capture exception while the connection was still healthy");

// The safest thing to do is to ensure that the connection is broken and attempt to cancel the task
// This must be done from another thread to not block the callback thread
Task.Factory.StartNew(() =>
{
_parser.State = TdsParserState.Broken;
_parser.Connection.BreakConnection();
source.TrySetCanceled();
});
}
}

#pragma warning disable 420 // a reference to a volatile field will not be treated as volatile

public void WriteAsyncCallback(IntPtr key, IntPtr packet, uint sniError)
Expand Down
Loading

0 comments on commit 81f549e

Please sign in to comment.