Skip to content

Commit

Permalink
Query: Reimplement concurrency detector
Browse files Browse the repository at this point in the history
Resolves #14534
  • Loading branch information
smitpatel committed Aug 8, 2019
1 parent dad6bec commit b19d2e0
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -882,29 +882,32 @@ public bool MoveNext()
{
try
{
if (_enumerator == null)
using (_cosmosQueryContext.ConcurrencyDetector.EnterCriticalSection())
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);
if (_enumerator == null)
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);

var sqlQuery = _querySqlGeneratorFactory.Create().GetSqlQuery(
selectExpression, _cosmosQueryContext.ParameterValues);
var sqlQuery = _querySqlGeneratorFactory.Create().GetSqlQuery(
selectExpression, _cosmosQueryContext.ParameterValues);

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(
_selectExpression.Container,
sqlQuery)
.GetEnumerator();
}
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(
_selectExpression.Container,
sqlQuery)
.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Expand Down Expand Up @@ -987,27 +990,30 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
if (_enumerator == null)
using (await _cosmosQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);
if (_enumerator == null)
{
var selectExpression = (SelectExpression)new InExpressionValuesExpandingExpressionVisitor(
_sqlExpressionFactory, _cosmosQueryContext.ParameterValues).Visit(_selectExpression);

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(
_selectExpression.Container,
_querySqlGeneratorFactory.Create().GetSqlQuery(selectExpression, _cosmosQueryContext.ParameterValues))
.GetAsyncEnumerator(_cancellationToken);
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(
_selectExpression.Container,
_querySqlGeneratorFactory.Create().GetSqlQuery(selectExpression, _cosmosQueryContext.ParameterValues))
.GetAsyncEnumerator(_cancellationToken);

}
}

var hasNext = await _enumerator.MoveNextAsync();
var hasNext = await _enumerator.MoveNextAsync();

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,21 @@ public bool MoveNext()
{
try
{
if (_enumerator == null)
using (_queryContext.ConcurrencyDetector.EnterCriticalSection())
{
_enumerator = _innerEnumerable.GetEnumerator();
}
if (_enumerator == null)
{
_enumerator = _innerEnumerable.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;
Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Expand Down Expand Up @@ -226,20 +229,23 @@ public ValueTask<bool> MoveNextAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();

if (_enumerator == null)
using (_queryContext.ConcurrencyDetector.EnterCriticalSection())
{
_enumerator = _innerEnumerable.GetEnumerator();
}
_cancellationToken.ThrowIfCancellationRequested();

if (_enumerator == null)
{
_enumerator = _innerEnumerable.GetEnumerator();
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;
Current = hasNext
? _shaper(_queryContext, _enumerator.Current)
: default;

return new ValueTask<bool>(hasNext);
return new ValueTask<bool>(hasNext);
}
}
catch (Exception exception)
{
Expand Down
112 changes: 58 additions & 54 deletions src/EFCore.Relational/Query/AsyncQueryingEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,81 +85,85 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
if (_dataReader == null)
using (await _relationalQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
{
var selectExpression = new ParameterValueBasedSelectExpressionOptimizer(
_sqlExpressionFactory,
_parameterNameGeneratorFactory)
.Optimize(_selectExpression, _relationalQueryContext.ParameterValues);

var relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);

_dataReader
= await relationalCommand.ExecuteReaderAsync(
new RelationalCommandParameterObject(
_relationalQueryContext.Connection,
_relationalQueryContext.ParameterValues,
_relationalQueryContext.Context,
_relationalQueryContext.CommandLogger),
_cancellationToken);

if (selectExpression.IsNonComposedFromSql())
if (_dataReader == null)
{
var projection = _selectExpression.Projection.ToList();
var readerColumns = Enumerable.Range(0, _dataReader.DbDataReader.FieldCount)
.ToDictionary(i => _dataReader.DbDataReader.GetName(i), i => i, StringComparer.OrdinalIgnoreCase);

_indexMap = new int[projection.Count];
for (var i = 0; i < projection.Count; i++)
var selectExpression = new ParameterValueBasedSelectExpressionOptimizer(
_sqlExpressionFactory,
_parameterNameGeneratorFactory)
.Optimize(_selectExpression, _relationalQueryContext.ParameterValues);

var relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);

_dataReader
= await relationalCommand.ExecuteReaderAsync(
new RelationalCommandParameterObject(
_relationalQueryContext.Connection,
_relationalQueryContext.ParameterValues,
_relationalQueryContext.Context,
_relationalQueryContext.CommandLogger),
_cancellationToken);

if (selectExpression.IsNonComposedFromSql())
{
if (projection[i].Expression is ColumnExpression columnExpression)
var projection = _selectExpression.Projection.ToList();
var readerColumns = Enumerable.Range(0, _dataReader.DbDataReader.FieldCount)
.ToDictionary(i => _dataReader.DbDataReader.GetName(i), i => i, StringComparer.OrdinalIgnoreCase);

_indexMap = new int[projection.Count];
for (var i = 0; i < projection.Count; i++)
{
var columnName = columnExpression.Name;
if (columnName != null)
if (projection[i].Expression is ColumnExpression columnExpression)
{
if (!readerColumns.TryGetValue(columnName, out var ordinal))
var columnName = columnExpression.Name;
if (columnName != null)
{
throw new InvalidOperationException(RelationalStrings.FromSqlMissingColumn(columnName));
}
if (!readerColumns.TryGetValue(columnName, out var ordinal))
{
throw new InvalidOperationException(RelationalStrings.FromSqlMissingColumn(columnName));
}

_indexMap[i] = ordinal;
_indexMap[i] = ordinal;
}
}
}
}
}
else
{
_indexMap = null;
else
{
_indexMap = null;
}

_resultCoordinator = new ResultCoordinator();
}

_resultCoordinator = new ResultCoordinator();
}

var hasNext = _resultCoordinator.HasNext ?? await _dataReader.ReadAsync();
Current = default;
var hasNext = _resultCoordinator.HasNext ?? await _dataReader.ReadAsync();
Current = default;

if (hasNext)
{
while (true)
if (hasNext)
{
_resultCoordinator.ResultReady = true;
_resultCoordinator.HasNext = null;
Current = _shaper(_relationalQueryContext, _dataReader.DbDataReader, Current, _indexMap, _resultCoordinator);
if (_resultCoordinator.ResultReady)
while (true)
{
break;
}
_resultCoordinator.ResultReady = true;
_resultCoordinator.HasNext = null;
Current = _shaper(_relationalQueryContext, _dataReader.DbDataReader, Current, _indexMap, _resultCoordinator);
if (_resultCoordinator.ResultReady)
{
break;
}

if (!await _dataReader.ReadAsync())
{
_resultCoordinator.HasNext = false;
if (!await _dataReader.ReadAsync())
{
_resultCoordinator.HasNext = false;

break;
break;
}
}
}
}

return hasNext;
return hasNext;
}
}
catch (Exception exception)
{
Expand Down
Loading

0 comments on commit b19d2e0

Please sign in to comment.