Skip to content

Commit

Permalink
Query: Reimplement concurrency detector
Browse files Browse the repository at this point in the history
Resolves #14534
  • Loading branch information
smitpatel committed Aug 7, 2019
1 parent dfd323b commit eaec12a
Showing 11 changed files with 289 additions and 166 deletions.
Original file line number Diff line number Diff line change
@@ -882,29 +882,32 @@ public bool MoveNext()
{
try
{
if (_enumerator == null)
using (_cosmosQueryContext.ConcurrencyDetector.EnterCriticalSection())
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);
if (_enumerator == null)
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);

var sqlQuery = _querySqlGeneratorFactory.Create().GetSqlQuery(
selectExpression, _cosmosQueryContext.ParameterValues);
var sqlQuery = _querySqlGeneratorFactory.Create().GetSqlQuery(
selectExpression, _cosmosQueryContext.ParameterValues);

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(
_selectExpression.Container,
sqlQuery)
.GetEnumerator();
}
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(
_selectExpression.Container,
sqlQuery)
.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
@@ -987,27 +990,30 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
if (_enumerator == null)
using (await _cosmosQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);
if (_enumerator == null)
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(
_selectExpression.Container,
_querySqlGeneratorFactory.Create().GetSqlQuery(selectExpression, _cosmosQueryContext.ParameterValues))
.GetAsyncEnumerator(_cancellationToken);
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(
_selectExpression.Container,
_querySqlGeneratorFactory.Create().GetSqlQuery(selectExpression, _cosmosQueryContext.ParameterValues))
.GetAsyncEnumerator(_cancellationToken);

}
}

var hasNext = await _enumerator.MoveNextAsync();
var hasNext = await _enumerator.MoveNextAsync();

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Original file line number Diff line number Diff line change
@@ -148,18 +148,21 @@ public bool MoveNext()
{
try
{
if (_enumerator == null)
using (_queryContext.ConcurrencyDetector.EnterCriticalSection())
{
_enumerator = _innerEnumerable.GetEnumerator();
}
if (_enumerator == null)
{
_enumerator = _innerEnumerable.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;
Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
@@ -226,20 +229,23 @@ public ValueTask<bool> MoveNextAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();

if (_enumerator == null)
using (_queryContext.ConcurrencyDetector.EnterCriticalSection())
{
_enumerator = _innerEnumerable.GetEnumerator();
}
_cancellationToken.ThrowIfCancellationRequested();

if (_enumerator == null)
{
_enumerator = _innerEnumerable.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;
Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;

return new ValueTask<bool>(hasNext);
return new ValueTask<bool>(hasNext);
}
}
catch (Exception exception)
{
112 changes: 58 additions & 54 deletions src/EFCore.Relational/Query/AsyncQueryingEnumerable.cs
Original file line number Diff line number Diff line change
@@ -85,81 +85,85 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
if (_dataReader == null)
using (await _relationalQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
{
var selectExpression = new ParameterValueBasedSelectExpressionOptimizer(
_sqlExpressionFactory,
_parameterNameGeneratorFactory)
.Optimize(_selectExpression, _relationalQueryContext.ParameterValues);

var relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);

_dataReader
= await relationalCommand.ExecuteReaderAsync(
new RelationalCommandParameterObject(
_relationalQueryContext.Connection,
_relationalQueryContext.ParameterValues,
_relationalQueryContext.Context,
_relationalQueryContext.CommandLogger),
_cancellationToken);

if (selectExpression.IsNonComposedFromSql())
if (_dataReader == null)
{
var projection = _selectExpression.Projection.ToList();
var readerColumns = Enumerable.Range(0, _dataReader.DbDataReader.FieldCount)
.ToDictionary(i => _dataReader.DbDataReader.GetName(i), i => i, StringComparer.OrdinalIgnoreCase);

_indexMap = new int[projection.Count];
for (var i = 0; i < projection.Count; i++)
var selectExpression = new ParameterValueBasedSelectExpressionOptimizer(
_sqlExpressionFactory,
_parameterNameGeneratorFactory)
.Optimize(_selectExpression, _relationalQueryContext.ParameterValues);

var relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);

_dataReader
= await relationalCommand.ExecuteReaderAsync(
new RelationalCommandParameterObject(
_relationalQueryContext.Connection,
_relationalQueryContext.ParameterValues,
_relationalQueryContext.Context,
_relationalQueryContext.CommandLogger),
_cancellationToken);

if (selectExpression.IsNonComposedFromSql())
{
if (projection[i].Expression is ColumnExpression columnExpression)
var projection = _selectExpression.Projection.ToList();
var readerColumns = Enumerable.Range(0, _dataReader.DbDataReader.FieldCount)
.ToDictionary(i => _dataReader.DbDataReader.GetName(i), i => i, StringComparer.OrdinalIgnoreCase);

_indexMap = new int[projection.Count];
for (var i = 0; i < projection.Count; i++)
{
var columnName = columnExpression.Name;
if (columnName != null)
if (projection[i].Expression is ColumnExpression columnExpression)
{
if (!readerColumns.TryGetValue(columnName, out var ordinal))
var columnName = columnExpression.Name;
if (columnName != null)
{
throw new InvalidOperationException(RelationalStrings.FromSqlMissingColumn(columnName));
}
if (!readerColumns.TryGetValue(columnName, out var ordinal))
{
throw new InvalidOperationException(RelationalStrings.FromSqlMissingColumn(columnName));
}

_indexMap[i] = ordinal;
_indexMap[i] = ordinal;
}
}
}
}
}
else
{
_indexMap = null;
else
{
_indexMap = null;
}

_resultCoordinator = new ResultCoordinator();
}

_resultCoordinator = new ResultCoordinator();
}

var hasNext = _resultCoordinator.HasNext ?? await _dataReader.ReadAsync();
Current = default;
var hasNext = _resultCoordinator.HasNext ?? await _dataReader.ReadAsync();
Current = default;

if (hasNext)
{
while (true)
if (hasNext)
{
_resultCoordinator.ResultReady = true;
_resultCoordinator.HasNext = null;
Current = _shaper(_relationalQueryContext, _dataReader.DbDataReader, Current, _indexMap, _resultCoordinator);
if (_resultCoordinator.ResultReady)
while (true)
{
break;
}
_resultCoordinator.ResultReady = true;
_resultCoordinator.HasNext = null;
Current = _shaper(_relationalQueryContext, _dataReader.DbDataReader, Current, _indexMap, _resultCoordinator);
if (_resultCoordinator.ResultReady)
{
break;
}

if (!await _dataReader.ReadAsync())
{
_resultCoordinator.HasNext = false;
if (!await _dataReader.ReadAsync())
{
_resultCoordinator.HasNext = false;

break;
break;
}
}
}
}

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Loading

0 comments on commit eaec12a

Please sign in to comment.