Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix | MARS header errors when MakeReadAsyncBlocking = false #922

Merged
merged 3 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4163,6 +4163,15 @@ internal void ResetSnapshotState()
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;

// Make sure to go through the appropriate increment/decrement methods if changing the OpenResult flag
if (!_stateObj.HasOpenResult && _state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj.HasOpenResult && !_state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.DecrementOpenResultCount();
}
_stateObj._snapshottedState = _state;

// Reset partially read state (these only need to be maintained if doing async without snapshot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4263,7 +4263,18 @@ internal void ResetSnapshotState()
_stateObj._nullBitmapInfo = _snapshotNullBitmapInfo;
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;
_stateObj._hasOpenResult = _snapshotHasOpenResult;

// Make sure to go through the appropriate increment/decrement methods if changing HasOpenResult
if (!_stateObj._hasOpenResult && _snapshotHasOpenResult)
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj._hasOpenResult && !_snapshotHasOpenResult)
{
_stateObj.DecrementOpenResultCount();
}
//else _stateObj._hasOpenResult is already == _snapshotHasOpenResult

_stateObj._receivedColMetaData = _snapshotReceivedColumnMetadata;
_stateObj._attentionReceived = _snapshotAttentionReceived;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,20 @@ public AsyncCancelledConnectionsTest(ITestOutputHelper output)
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureServer))]
public void CancelAsyncConnections()
{
string connectionString = DataTestUtility.TCPConnectionString;
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder(DataTestUtility.TCPConnectionString);
builder.MultipleActiveResultSets = false;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
builder.MultipleActiveResultSets = true;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
}

private void RunCancelAsyncConnections(SqlConnectionStringBuilder connectionStringBuilder, bool makeAsyncBlocking)
{
SqlConnection.ClearAllPools();
AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.MakeReadAsyncBlocking", makeAsyncBlocking);

_watch = Stopwatch.StartNew();
_random = new Random(4); // chosen via fair dice role.
ParallelLoopResult results = new ParallelLoopResult();
Expand All @@ -38,7 +51,7 @@ public void CancelAsyncConnections()
results = Parallel.For(
fromInclusive: 0,
toExclusive: NumberOfTasks,
(int i) => DoManyAsync(connectionString).GetAwaiter().GetResult());
(int i) => DoManyAsync(connectionStringBuilder).GetAwaiter().GetResult());
}
}
catch (Exception ex)
Expand Down Expand Up @@ -78,18 +91,26 @@ private void DisplaySummary()
}

// This is the the main body that our Tasks run
private async Task DoManyAsync(string connectionString)
private async Task DoManyAsync(SqlConnectionStringBuilder connectionStringBuilder)
{
Interlocked.Increment(ref _start);
Interlocked.Increment(ref _inFlight);

// First poison
await DoOneAsync(connectionString, poison: true);

for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
using (SqlConnection marsConnection = new SqlConnection(connectionStringBuilder.ToString()))
{
// now run some without poisoning
await DoOneAsync(connectionString);
if (connectionStringBuilder.MultipleActiveResultSets)
{
await marsConnection.OpenAsync();
}

// First poison
await DoOneAsync(marsConnection, connectionStringBuilder.ToString(), poison: true);

for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(marsConnection, connectionStringBuilder.ToString());
}
}

Interlocked.Decrement(ref _inFlight);
Expand All @@ -100,95 +121,30 @@ private async Task DoManyAsync(string connectionString)
// if we are poisoning we will
// 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it
// 2 - Setup a time bomb task that will cancel the command a random amount of time later
private async Task DoOneAsync(string connectionString, bool poison = false)
private async Task DoOneAsync(SqlConnection marsConnection, string connectionString, bool poison = false)
{
try
{
using (var connection = new SqlConnection(connectionString))
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
{
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
}
builder.AppendLine("WAITFOR DELAY '00:00:01'");
}
}

int rowsRead = 0;
int resultRead = 0;

try
using (var connection = new SqlConnection(connectionString))
{
if (marsConnection != null && marsConnection.State == System.Data.ConnectionState.Open)
{
await connection.OpenAsync();
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = builder.ToString();

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
await RunCommand(marsConnection, builder.ToString(), poison);
}
finally
else
{
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
await connection.OpenAsync();
await RunCommand(connection, builder.ToString(), poison);
}
}
}
Expand Down Expand Up @@ -224,6 +180,83 @@ private async Task DoOneAsync(string connectionString, bool poison = false)
}
}

private async Task RunCommand(SqlConnection connection, string commandText, bool poison)
{
int rowsRead = 0;
int resultRead = 0;

try
{
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = commandText;

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
finally
{
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}

private async Task TimeBombAsync(SqlCommand command)
{
await SleepAsync(100, 3000);
Expand Down