Skip to content

Commit

Permalink
Fix multiple retry issue on ExecuteScalarAsync + add missing Configur…
Browse files Browse the repository at this point in the history
…eAwait calls (#1220)
  • Loading branch information
cheenamalhotra committed Aug 20, 2021
1 parent e5863e9 commit 8412c78
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2575,9 +2575,9 @@ protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior b
throw result.Exception.InnerException;
}
return result.Result;
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled,
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled,
TaskScheduler.Default
);
}
Expand Down Expand Up @@ -2679,13 +2679,9 @@ private void SetCachedCommandExecuteReaderAsyncContext(ExecuteReaderAsyncCallCon
}

/// <include file='../../../../../../../doc/snippets/Microsoft.Data.SqlClient/SqlCommand.xml' path='docs/members[@name="SqlCommand"]/ExecuteScalarAsync[@name="CancellationToken"]/*'/>
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken)
=> IsRetryEnabled ?
InternalExecuteScalarWithRetryAsync(cancellationToken) :
InternalExecuteScalarAsync(cancellationToken);

private Task<object> InternalExecuteScalarWithRetryAsync(CancellationToken cancellationToken)
=> RetryLogicProvider.ExecuteAsync(this, () => InternalExecuteScalarAsync(cancellationToken), cancellationToken);
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken) =>
// Do not use retry logic here as internal call to ExecuteReaderAsync handles retry logic.
InternalExecuteScalarAsync(cancellationToken);

private Task<object> InternalExecuteScalarAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -3733,13 +3729,13 @@ private SqlDataReader GetParameterEncryptionDataReader(out Task returnTask, Task
// Read the results of describe parameter encryption.
command.ReadDescribeEncryptionParameterResults(describeParameterEncryptionDataReader, describeParameterEncryptionRpcOriginalRpcMap);
#if DEBUG
#if DEBUG
// Failpoint to force the thread to halt to simulate cancellation of SqlCommand.
if (_sleepAfterReadDescribeEncryptionParameterResults)
{
Thread.Sleep(10000);
}
#endif
#endif
}
catch (Exception e)
{
Expand Down Expand Up @@ -4897,7 +4893,7 @@ private SqlDataReader RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavi
private Task RunExecuteReaderTdsSetupContinuation(RunBehavior runBehavior, SqlDataReader ds, string optionSettings, Task writeTask)
{
Task task = AsyncHelper.CreateContinuationTaskWithState(
task: writeTask,
task: writeTask,
state: _activeConnection,
onSuccess: (object state) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3106,14 +3106,10 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
return returnedTask;
}

private Task<object> InternalExecuteScalarWithRetryAsync(CancellationToken cancellationToken)
=> RetryLogicProvider.ExecuteAsync(this, () => InternalExecuteScalarAsync(cancellationToken), cancellationToken);

/// <include file='../../../../../../../doc/snippets/Microsoft.Data.SqlClient/SqlCommand.xml' path='docs/members[@name="SqlCommand"]/ExecuteScalarAsync[@name="CancellationToken"]/*'/>
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken)
=> IsRetryEnabled ?
InternalExecuteScalarWithRetryAsync(cancellationToken) :
InternalExecuteScalarAsync(cancellationToken);
public override Task<object> ExecuteScalarAsync(CancellationToken cancellationToken) =>
// Do not use retry logic here as internal call to ExecuteReaderAsync handles retry logic.
InternalExecuteScalarAsync(cancellationToken);

private Task<object> InternalExecuteScalarAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class SqlRetryLogicProvider : SqlRetryLogicBaseProvider
{
private const string TypeName = nameof(SqlRetryLogicProvider);
// keeps free RetryLogic objects
private readonly ConcurrentBag<SqlRetryLogicBase> _retryLogicPool = new ConcurrentBag<SqlRetryLogicBase>();
private readonly ConcurrentBag<SqlRetryLogicBase> _retryLogicPool = new();

/// <summary>Creates an instance of this type.</summary>
public SqlRetryLogicProvider(SqlRetryLogicBase retryLogic)
Expand All @@ -28,8 +28,7 @@ public SqlRetryLogicProvider(SqlRetryLogicBase retryLogic)

private SqlRetryLogicBase GetRetryLogic()
{
SqlRetryLogicBase retryLogic = null;
if (!_retryLogicPool.TryTake(out retryLogic))
if (!_retryLogicPool.TryTake(out SqlRetryLogicBase retryLogic))
{
retryLogic = RetryLogic.Clone() as SqlRetryLogicBase;
}
Expand Down Expand Up @@ -69,7 +68,7 @@ public override TResult Execute<TResult>(object sender, Func<TResult> function)
{
if (RetryLogic.RetryCondition(sender) && RetryLogic.TransientPredicate(e))
{
retryLogic = retryLogic ?? GetRetryLogic();
retryLogic ??= GetRetryLogic();
SqlClientEventSource.Log.TryTraceEvent("<sc.{0}.Execute<TResult>|INFO> Found an action eligible for the retry policy (retried attempts = {1}).",
TypeName, retryLogic.Current);
exceptions.Add(e);
Expand Down Expand Up @@ -107,15 +106,15 @@ public override async Task<TResult> ExecuteAsync<TResult>(object sender, Func<Ta
retry:
try
{
TResult result = await function.Invoke();
TResult result = await function.Invoke().ConfigureAwait(false);
RetryLogicPoolAdd(retryLogic);
return result;
}
catch (Exception e)
{
if (RetryLogic.RetryCondition(sender) && RetryLogic.TransientPredicate(e))
{
retryLogic = retryLogic ?? GetRetryLogic();
retryLogic ??= GetRetryLogic();
SqlClientEventSource.Log.TryTraceEvent("<sc.{0}.ExecuteAsync<TResult>|INFO> Found an action eligible for the retry policy (retried attempts = {1}).",
TypeName, retryLogic.Current);
exceptions.Add(e);
Expand All @@ -124,7 +123,7 @@ public override async Task<TResult> ExecuteAsync<TResult>(object sender, Func<Ta
// The retrying event raises on each retry.
ApplyRetryingEvent(sender, retryLogic, intervalTime, exceptions, e);

await Task.Delay(intervalTime, cancellationToken);
await Task.Delay(intervalTime, cancellationToken).ConfigureAwait(false);
goto retry;
}
else
Expand Down Expand Up @@ -153,14 +152,14 @@ public override async Task ExecuteAsync(object sender, Func<Task> function, Canc
retry:
try
{
await function.Invoke();
await function.Invoke().ConfigureAwait(false);
RetryLogicPoolAdd(retryLogic);
}
catch (Exception e)
{
if (RetryLogic.RetryCondition(sender) && RetryLogic.TransientPredicate(e))
{
retryLogic = retryLogic ?? GetRetryLogic();
retryLogic ??= GetRetryLogic();
SqlClientEventSource.Log.TryTraceEvent("<sc.{0}.ExecuteAsync|INFO> Found an action eligible for the retry policy (retried attempts = {1}).",
TypeName, retryLogic.Current);
exceptions.Add(e);
Expand All @@ -169,7 +168,7 @@ public override async Task ExecuteAsync(object sender, Func<Task> function, Canc
// The retrying event raises on each retry.
ApplyRetryingEvent(sender, retryLogic, intervalTime, exceptions, e);

await Task.Delay(intervalTime, cancellationToken);
await Task.Delay(intervalTime, cancellationToken).ConfigureAwait(false);
goto retry;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
</Compile>
<Compile Include="DataCommon\AADUtility.cs" />
<Compile Include="SQL\ConfigurableIpPreferenceTest\ConfigurableIpPreferenceTest.cs" />
<Compile Include="SQL\RetryLogic\RetryLogicCounterTest.cs" />
<Compile Include="SQL\RetryLogic\RetryLogicConfigHelper.cs" />
<Compile Include="SQL\RetryLogic\RetryLogicTestHelper.cs" />
<Compile Include="SQL\RetryLogic\SqlCommandReliabilityTest.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Data.SqlClient.ManualTesting.Tests
{
public class RetryLogicCounterTest
{
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
[InlineData("ExecuteScalarAsync", 3)]
[InlineData("ExecuteReaderAsync", 3)]
[InlineData("ExecuteXmlReaderAsync", 3)]
[InlineData("ExecuteNonQueryAsync", 3)]
public async void ValidateRetryCount_SqlCommand_Async(string methodName, int numOfTries)
{
ErrorInfoRetryLogicProvider _errorInfoRetryProvider = new(
SqlConfigurableRetryFactory.CreateFixedRetryProvider(new SqlRetryLogicOption()
{ NumberOfTries = numOfTries, TransientErrors = new[] { 50000 } }));

try
{
RetryLogicTestHelper.SetRetrySwitch(true);

using var connection = new SqlConnection(DataTestUtility.TCPConnectionString);
connection.Open();

using SqlCommand cmd = connection.CreateCommand();
cmd.RetryLogicProvider = _errorInfoRetryProvider;
cmd.CommandText = "THROW 50000,'Error',0";

_errorInfoRetryProvider.CallCounter = 0;
switch (methodName)
{
case "ExecuteScalarAsync":
await cmd.ExecuteScalarAsync();
break;
case "ExecuteReaderAsync":
{
using SqlDataReader _ = await cmd.ExecuteReaderAsync();
break;
}
case "ExecuteXmlReaderAsync":
{
using System.Xml.XmlReader _ = await cmd.ExecuteXmlReaderAsync();
break;
}
case "ExecuteNonQueryAsync":
await cmd.ExecuteNonQueryAsync();
break;
default:
break;
}
Assert.False(true, "Exception did not occur.");
}
catch
{
Assert.Equal(numOfTries, _errorInfoRetryProvider.CallCounter);
}
finally
{
RetryLogicTestHelper.SetRetrySwitch(false);
}
}

[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
[InlineData("ExecuteScalar", 3)]
[InlineData("ExecuteReader", 3)]
[InlineData("ExecuteXmlReader", 3)]
[InlineData("ExecuteNonQuery", 3)]
public void ValidateRetryCount_SqlCommand_Sync(string methodName, int numOfTries)
{
ErrorInfoRetryLogicProvider _errorInfoRetryProvider = new(
SqlConfigurableRetryFactory.CreateFixedRetryProvider(new SqlRetryLogicOption()
{ NumberOfTries = numOfTries, TransientErrors = new[] { 50000 } }));

try
{
RetryLogicTestHelper.SetRetrySwitch(true);

using var connection = new SqlConnection(DataTestUtility.TCPConnectionString);
connection.Open();

using SqlCommand cmd = connection.CreateCommand();
cmd.RetryLogicProvider = _errorInfoRetryProvider;
cmd.CommandText = "THROW 50000,'Error',0";

_errorInfoRetryProvider.CallCounter = 0;
switch (methodName)
{
case "ExecuteScalar":
cmd.ExecuteScalar();
break;
case "ExecuteReader":
{
using SqlDataReader _ = cmd.ExecuteReader();
break;
}
case "ExecuteXmlReader":
{
using System.Xml.XmlReader _ = cmd.ExecuteXmlReader();
break;
}
case "ExecuteNonQuery":
cmd.ExecuteNonQuery();
break;
default:
break;
}
Assert.False(true, "Exception did not occur.");
}
catch
{
Assert.Equal(numOfTries, _errorInfoRetryProvider.CallCounter);
}
finally
{
RetryLogicTestHelper.SetRetrySwitch(false);
}
}

public class ErrorInfoRetryLogicProvider : SqlRetryLogicBaseProvider
{
public SqlRetryLogicBaseProvider InnerProvider { get; }

public ErrorInfoRetryLogicProvider(SqlRetryLogicBaseProvider innerProvider)
{
InnerProvider = innerProvider;
}

readonly AsyncLocal<int> _depth = new();
public int CallCounter = 0;

TResult LogCalls<TResult>(Func<TResult> function)
{
CallCounter++;
return function();
}

public override TResult Execute<TResult>(object sender, Func<TResult> function)
{
_depth.Value++;
try
{
return InnerProvider.Execute(sender, () => LogCalls(function));
}
finally
{
_depth.Value--;
}
}

public async Task<TResult> LogCallsAsync<TResult>(Func<Task<TResult>> function)
{
CallCounter++;
return await function();
}

public override async Task<TResult> ExecuteAsync<TResult>(object sender, Func<Task<TResult>> function,
CancellationToken cancellationToken = default)
{
_depth.Value++;
try
{
return await InnerProvider.ExecuteAsync(sender, () => LogCallsAsync(function), cancellationToken);
}
finally
{
_depth.Value--;
}
}

public async Task LogCallsAsync(Func<Task> function)
{
CallCounter++;
await function();
}

public override async Task ExecuteAsync(object sender, Func<Task> function,
CancellationToken cancellationToken = default)
{
_depth.Value++;
try
{
await InnerProvider.ExecuteAsync(sender, () => LogCallsAsync(function), cancellationToken);
}
finally
{
_depth.Value--;
}
}
}
}
}

0 comments on commit 8412c78

Please sign in to comment.