Skip to content

Commit

Permalink
Query: Relational: Lays groundwork for removing MARS requirement.
Browse files Browse the repository at this point in the history
- Remove remaining DbDataReader dependencies from query processing - Use ValueBuffer everywhere.
- Removed offsetting from ValueBuffer factory factories.
- Enables dynamic ValueBuffer index re-mapping for FromSql non-composed projection out-of-order scenarios.
- Adds workaround for IX-Async #93
- Enables Select op over non-composed FromSql queries!

dprun: 100%
  • Loading branch information
anpete committed May 13, 2015
1 parent c35a8ed commit 4f05fbf
Show file tree
Hide file tree
Showing 46 changed files with 620 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public virtual Expression CreateMaterializeExpression(
Expression valueBufferExpression,
int[] indexMap = null)
{
// ReSharper disable once SuspiciousTypeConversion.Global
var materializer = entityType as IEntityMaterializer;

if (materializer != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private static readonly MethodInfo _toSequence

[UsedImplicitly]
private static IAsyncEnumerable<T> _ToSequence<T>(T element)
=> AsyncEnumerable.Return(element);
=> new AsyncEnumerableAdapter<T>(new[] { element });

public virtual MethodInfo ToSequence => _toSequence;

Expand Down
1 change: 1 addition & 0 deletions src/EntityFramework.Core/Query/ResultOperatorHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public static Expression CallWithPossibleCancellationToken(
_cancellationTokenProperty)
}));
}

return Expression.Call(methodInfo, arguments);
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/EntityFramework.Core/Storage/ValueBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Diagnostics;
using JetBrains.Annotations;

Expand All @@ -10,24 +11,34 @@ public struct ValueBuffer
{
public static readonly ValueBuffer Empty = new ValueBuffer();

private readonly object[] _values;
private readonly IReadOnlyList<object> _values;
private readonly int _offset;

public ValueBuffer([NotNull] object[] values)
public ValueBuffer([NotNull] IReadOnlyList<object> values)
: this(values, 0)
{
}

public ValueBuffer([NotNull] object[] values, int offset)
public ValueBuffer([NotNull] IReadOnlyList<object> values, int offset)
{
Debug.Assert(values != null);
Debug.Assert(offset >= 0);

_values = values;
_offset = offset;
}

public object this[int index] => _values[_offset + index];

public int Count => _values.Length - _offset;
public int Count => _values.Count - _offset;

public ValueBuffer UpdateOffset(int offset)
{
Debug.Assert(offset >= _offset);

return offset > _offset
? new ValueBuffer(_values, offset)
: this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@
<Compile Include="Migrations\Sql\IMigrationSqlGenerator.cs" />
<Compile Include="Migrations\Sql\MigrationSqlGenerator.cs" />
<Compile Include="Migrations\Infrastructure\ModelDiffer.cs" />
<Compile Include="Query\IValueBufferCursor.cs" />
<Compile Include="RelationalDatabaseFactory.cs" />
<Compile Include="RelationalDataStoreServices.cs" />
<Compile Include="RemappingUntypedValueBufferFactory.cs" />
<Compile Include="UntypedValueBufferFactory.cs" />
<Compile Include="UntypedValueBufferFactoryFactory.cs" />
<Compile Include="Query\Annotations\FromSqlQueryAnnotation.cs" />
Expand All @@ -160,7 +162,6 @@
<Compile Include="Query\CommandBuilder.cs" />
<Compile Include="Query\CommandParameter.cs" />
<Compile Include="Query\Expressions\NotNullableExpression.cs" />
<Compile Include="Query\Expressions\ValueBufferFactoryExpression.cs" />
<Compile Include="Query\ExpressionTreeVisitors\CompositeRelationalExpressionTreeVisitor.cs" />
<Compile Include="Query\ExpressionTreeVisitors\EqualityPredicateExpandingVisitor.cs" />
<Compile Include="Query\ExpressionTreeVisitors\EqualityPredicateInExpressionOptimizer.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Data.Entity.Relational
{
public interface IRelationalValueBufferFactoryFactory
{
IRelationalValueBufferFactory CreateValueBufferFactory([NotNull] IEnumerable<Type> valueTypes, int offset);
IRelationalValueBufferFactory CreateValueBufferFactory(
[NotNull] IReadOnlyCollection<Type> valueTypes, [CanBeNull] IReadOnlyList<int> indexMap);
}
}
8 changes: 8 additions & 0 deletions src/EntityFramework.Relational/Properties/Strings.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/EntityFramework.Relational/Properties/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,7 @@
<data name="UsingConnection" xml:space="preserve">
<value>Using database '{database}' on server '{dataSource}'.</value>
</data>
<data name="FromSqlMissingColumn" xml:space="preserve">
<value>The required column '{column}' was not present in the results of a 'FromSql' operation.</value>
</data>
</root>
53 changes: 26 additions & 27 deletions src/EntityFramework.Relational/Query/AsyncQueryMethodProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Reflection;
using System.Threading;
Expand All @@ -27,18 +26,15 @@ private static readonly MethodInfo _getResultMethodInfo

[UsedImplicitly]
private static async Task<TResult> GetResult<TResult>(
IAsyncEnumerable<DbDataReader> dataReaders, CancellationToken cancellationToken)
IAsyncEnumerable<ValueBuffer> valueBuffers, CancellationToken cancellationToken)
{
using (var enumerator = dataReaders.GetEnumerator())
using (var enumerator = valueBuffers.GetEnumerator())
{
if (await enumerator.MoveNext(cancellationToken))
{
var result
= await enumerator.Current.IsDBNullAsync(0, cancellationToken)
? default(TResult)
: await enumerator.Current.GetFieldValueAsync<TResult>(0, cancellationToken);

return result;
return enumerator.Current[0] == null
? default(TResult)
: (TResult)enumerator.Current[0];
}
}

Expand All @@ -53,13 +49,16 @@ private static readonly MethodInfo _queryMethodInfo

[UsedImplicitly]
private static IAsyncEnumerable<T> _Query<T>(
QueryContext queryContext, CommandBuilder commandBuilder, Func<DbDataReader, T> shaper)
QueryContext queryContext,
CommandBuilder commandBuilder,
Func<ValueBuffer, T> shaper)
{
return new AsyncQueryingEnumerable<T>(
((RelationalQueryContext)queryContext),
commandBuilder,
shaper,
queryContext.Logger);
return
new AsyncQueryingEnumerable(
((RelationalQueryContext)queryContext),
commandBuilder,
queryContext.Logger)
.Select(shaper);
}

public virtual MethodInfo IncludeMethod => _includeMethodInfo;
Expand Down Expand Up @@ -126,39 +125,39 @@ private static readonly MethodInfo _createReferenceIncludeStrategyMethodInfo
[UsedImplicitly]
private static IAsyncIncludeRelatedValuesStrategy _CreateReferenceIncludeStrategy(
RelationalQueryContext relationalQueryContext,
IRelationalValueBufferFactory valueBufferFactory,
int readerIndex,
int valueBufferOffset,
int queryIndex,
Func<ValueBuffer, object> materializer)
{
return new ReferenceIncludeRelatedValuesStrategy(
relationalQueryContext, valueBufferFactory, readerIndex, materializer);
relationalQueryContext, valueBufferOffset, queryIndex, materializer);
}

private class ReferenceIncludeRelatedValuesStrategy : IAsyncIncludeRelatedValuesStrategy
{
private readonly RelationalQueryContext _queryContext;
private readonly IRelationalValueBufferFactory _valueBufferFactory;
private readonly int _readerIndex;
private readonly int _valueBufferOffset;
private readonly int _queryIndex;
private readonly Func<ValueBuffer, object> _materializer;

public ReferenceIncludeRelatedValuesStrategy(
RelationalQueryContext queryContext,
IRelationalValueBufferFactory valueBufferFactory,
int readerIndex,
int valueBufferOffset,
int queryIndex,
Func<ValueBuffer, object> materializer)
{
_queryContext = queryContext;
_valueBufferFactory = valueBufferFactory;
_readerIndex = readerIndex;
_valueBufferOffset = valueBufferOffset;
_queryIndex = queryIndex;
_materializer = materializer;
}

public IAsyncEnumerable<EntityLoadInfo> GetRelatedValues(EntityKey key, Func<ValueBuffer, EntityKey> keyFactory)
{
var valueBuffer = _queryContext.GetValueBuffer(_queryIndex).UpdateOffset(_valueBufferOffset);

return new AsyncEnumerableAdapter<EntityLoadInfo>(
new EntityLoadInfo(
_valueBufferFactory.CreateValueBuffer(_queryContext.GetDataReader(_readerIndex)),
_materializer));
new EntityLoadInfo(valueBuffer, _materializer));
}

private class AsyncEnumerableAdapter<T> : IAsyncEnumerable<T>
Expand Down
91 changes: 49 additions & 42 deletions src/EntityFramework.Relational/Query/AsyncQueryingEnumerable.cs
Original file line number Diff line number Diff line change
@@ -1,107 +1,114 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Data.Entity.Storage;
using Microsoft.Data.Entity.Utilities;
using Microsoft.Framework.Logging;

namespace Microsoft.Data.Entity.Relational.Query
{
public class AsyncQueryingEnumerable<T> : IAsyncEnumerable<T>
public class AsyncQueryingEnumerable : IAsyncEnumerable<ValueBuffer>
{
private readonly RelationalQueryContext _relationalQueryContext;
private readonly CommandBuilder _commandBuilder;
private readonly Func<DbDataReader, T> _shaper;
private readonly ILogger _logger;

public AsyncQueryingEnumerable(
[NotNull] RelationalQueryContext relationalQueryContext,
[NotNull] CommandBuilder commandBuilder,
[NotNull] Func<DbDataReader, T> shaper,
[NotNull] ILogger logger)
{
Check.NotNull(relationalQueryContext, nameof(relationalQueryContext));
Check.NotNull(commandBuilder, nameof(commandBuilder));
Check.NotNull(shaper, nameof(shaper));
Check.NotNull(logger, nameof(logger));

_relationalQueryContext = relationalQueryContext;
_commandBuilder = commandBuilder;
_shaper = shaper;
_logger = logger;
}

public virtual IAsyncEnumerator<T> GetEnumerator()
public virtual IAsyncEnumerator<ValueBuffer> GetEnumerator()
{
return new AsyncEnumerator(this);
}

private sealed class AsyncEnumerator : IAsyncEnumerator<T>
private sealed class AsyncEnumerator : IAsyncEnumerator<ValueBuffer>, IValueBufferCursor
{
private readonly AsyncQueryingEnumerable<T> _enumerable;
private readonly AsyncQueryingEnumerable _queryingEnumerable;

private DbDataReader _reader;
private DbDataReader _dataReader;

private bool _disposed;

public AsyncEnumerator(AsyncQueryingEnumerable<T> enumerable)
public AsyncEnumerator(AsyncQueryingEnumerable queryingEnumerable)
{
_enumerable = enumerable;
_queryingEnumerable = queryingEnumerable;
}

public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
Debug.Assert(!_disposed);

cancellationToken.ThrowIfCancellationRequested();

var hasNext
= await (_reader == null
? InitializeAndReadAsync(cancellationToken)
: _reader.ReadAsync(cancellationToken));
if (_dataReader == null)
{
await _queryingEnumerable._relationalQueryContext.Connection
.OpenAsync(cancellationToken);

using (var command
= _queryingEnumerable._commandBuilder
.Build(
_queryingEnumerable._relationalQueryContext.Connection,
_queryingEnumerable._relationalQueryContext.ParameterValues))
{
_queryingEnumerable._logger.LogCommand(command);

Current = !hasNext ? default(T) : _enumerable._shaper(_reader);
_dataReader = await command.ExecuteReaderAsync(cancellationToken);

return hasNext;
}

private async Task<bool> InitializeAndReadAsync(CancellationToken cancellationToken)
{
await _enumerable._relationalQueryContext.Connection
.OpenAsync(cancellationToken);

using (var command
= _enumerable._commandBuilder
.Build(
_enumerable._relationalQueryContext.Connection,
_enumerable._relationalQueryContext.ParameterValues))
{
_enumerable._logger.LogCommand(command);
_queryingEnumerable._commandBuilder.NotifyReaderCreated(_dataReader);
}

_reader = await command.ExecuteReaderAsync(cancellationToken);
_queryingEnumerable._relationalQueryContext.RegisterActiveQuery(this);
}

_enumerable._relationalQueryContext.RegisterDataReader(_reader);
var hasNext = await _dataReader.ReadAsync(cancellationToken);

return await _reader.ReadAsync(cancellationToken);
Current
= hasNext
? _queryingEnumerable._commandBuilder.ValueBufferFactory
.CreateValueBuffer(_dataReader)
: default(ValueBuffer);

return hasNext;
}

public T Current { get; private set; }
public ValueBuffer Current { get; private set; }

private readonly object _gate = new object();

public void Dispose()
{
if (!_disposed)
// TODO: Undiagnosed IX-Async re-entrancy here.
// https://github.com/Reactive-Extensions/Rx.NET/issues/93
lock (_gate)
{
if (_reader != null)
if (!_disposed)
{
_reader.Dispose();
_enumerable._relationalQueryContext.Connection?.Close();
}
if (_dataReader != null)
{
_dataReader.Dispose();
_queryingEnumerable._relationalQueryContext.Connection?.Close();
}

_disposed = true;
_disposed = true;
}
}
}
}
Expand Down
Loading

0 comments on commit 4f05fbf

Please sign in to comment.