diff --git a/Dapper/SqlMapper.Async.cs b/Dapper/SqlMapper.Async.cs index 68cbe6b9..37fe2e41 100644 --- a/Dapper/SqlMapper.Async.cs +++ b/Dapper/SqlMapper.Async.cs @@ -5,6 +5,7 @@ using System.Data.Common; using System.Globalization; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -1217,5 +1218,79 @@ private static async Task ExecuteScalarImplAsync(IDbConnection cnn, Comman } return Parse(result); } + +#if NET5_0_OR_GREATER + /// + /// Execute a query asynchronously using . + /// + /// The type of results to return. + /// The connection to query on. + /// The SQL to execute for the query. + /// The parameters to pass, if any. + /// The transaction to use, if any. + /// The command timeout (in seconds). + /// The type of command to execute. + /// + /// A sequence of data of ; if a basic type (int, string, etc) is queried then the data from the first column is assumed, otherwise an instance is + /// created per row, and a direct column-name===member-name mapping is assumed (case insensitive). + /// + public static IAsyncEnumerable QueryUnbufferedAsync(this DbConnection cnn, string sql, object param = null, DbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null) + { + // note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators + return QueryUnbufferedAsync(cnn, typeof(T), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default)); + } + + private static IAsyncEnumerable QueryUnbufferedAsync(this IDbConnection cnn, Type effectiveType, CommandDefinition command) + { + return Impl(cnn, effectiveType, command, command.CancellationToken); // proxy to allow CT expression + + static async IAsyncEnumerable Impl(IDbConnection cnn, Type effectiveType, CommandDefinition command, + [EnumeratorCancellation] CancellationToken cancel) + { + object param = command.Parameters; + var identity = new Identity(command.CommandText, command.CommandType, cnn, effectiveType, param?.GetType()); + var info = GetCacheInfo(identity, param, command.AddToCache); + bool wasClosed = cnn.State == ConnectionState.Closed; + using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader); + DbDataReader reader = null; + try + { + if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false); + reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + + var tuple = info.Deserializer; + int hash = GetColumnHash(reader); + if (tuple.Func == null || tuple.Hash != hash) + { + if (reader.FieldCount == 0) + { + yield break; + } + tuple = info.Deserializer = new DeserializerState(hash, GetDeserializer(effectiveType, reader, 0, -1, false)); + if (command.AddToCache) SetQueryCache(identity, info); + } + + var func = tuple.Func; + + var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType; + while (await reader.ReadAsync(cancel).ConfigureAwait(false)) + { + object val = func(reader); + yield return GetValue(reader, effectiveType, val); + } + while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ } + command.OnCompleted(); + } + finally + { + if (reader is not null) + { + await reader.DisposeAsync(); + } + if (wasClosed) cnn.Close(); + } + } + } +#endif } } diff --git a/Dapper/SqlMapper.GridReader.Async.cs b/Dapper/SqlMapper.GridReader.Async.cs index f1c5a7fb..c15aadfe 100644 --- a/Dapper/SqlMapper.GridReader.Async.cs +++ b/Dapper/SqlMapper.GridReader.Async.cs @@ -2,8 +2,8 @@ using System.Collections.Generic; using System.Data; using System.Data.Common; -using System.Globalization; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -12,6 +12,9 @@ namespace Dapper public static partial class SqlMapper { public partial class GridReader +#if NET5_0_OR_GREATER + : IAsyncDisposable +#endif { private readonly CancellationToken cancel; internal GridReader(IDbCommand command, DbDataReader reader, Identity identity, DynamicParameters dynamicParams, bool addToCache, CancellationToken cancel) @@ -140,7 +143,7 @@ public Task ReadSingleOrDefaultAsync(Type type) private async Task NextResultAsync() { - if (await ((DbDataReader)reader).NextResultAsync(cancel).ConfigureAwait(false)) + if (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { // readCount++; gridIndex++; @@ -150,14 +153,37 @@ private async Task NextResultAsync() { // happy path; close the reader cleanly - no // need for "Cancel" etc +#if NET5_0_OR_GREATER + await reader.DisposeAsync(); +#else reader.Dispose(); +#endif reader = null; callbacks?.OnCompleted(); +#if NET5_0_OR_GREATER + await DisposeAsync(); +#else Dispose(); +#endif } } private Task> ReadAsyncImpl(Type type, bool buffered) + { + var deserializer = ValidateAndMarkConsumed(type); + if (buffered) + { + return ReadBufferedAsync(gridIndex, deserializer); + } + else + { + var result = ReadDeferred(gridIndex, deserializer, type); + if (buffered) result = result?.ToList(); // for the "not a DbDataReader" scenario + return Task.FromResult(result); + } + } + + private Func ValidateAndMarkConsumed(Type type) { if (reader == null) throw new ObjectDisposedException(GetType().FullName, "The reader has been disposed; this can happen after all data has been consumed"); if (IsConsumed) throw new InvalidOperationException("Query results must be consumed in the correct order, and each result can only be consumed once"); @@ -172,27 +198,10 @@ private Task> ReadAsyncImpl(Type type, bool buffered) cache.Deserializer = deserializer; } IsConsumed = true; - if (buffered && reader is DbDataReader) - { - return ReadBufferedAsync(gridIndex, deserializer.Func); - } - else - { - var result = ReadDeferred(gridIndex, deserializer.Func, type); - if (buffered) result = result?.ToList(); // for the "not a DbDataReader" scenario - return Task.FromResult(result); - } - } - - private Task ReadRowAsyncImpl(Type type, Row row) - { - if (reader is DbDataReader dbReader) return ReadRowAsyncImplViaDbReader(dbReader, type, row); - - // no async API available; use non-async and fake it - return Task.FromResult(ReadRow(type, row)); + return deserializer.Func; } - private async Task ReadRowAsyncImplViaDbReader(DbDataReader reader, Type type, Row row) + private async Task ReadRowAsyncImpl(Type type, Row row) { if (reader == null) throw new ObjectDisposedException(GetType().FullName, "The reader has been disposed; this can happen after all data has been consumed"); if (IsConsumed) throw new InvalidOperationException("Query results must be consumed in the correct order, and each result can only be consumed once"); @@ -229,7 +238,6 @@ private async Task> ReadBufferedAsync(int index, Func(); while (index == gridIndex && await reader.ReadAsync(cancel).ConfigureAwait(false)) { @@ -245,6 +253,64 @@ private async Task> ReadBufferedAsync(int index, Func + /// Read the next grid of results. + /// + /// The type to read. + public IAsyncEnumerable ReadUnbufferedAsync() => ReadAsyncUnbufferedImpl(typeof(T)); + + private IAsyncEnumerable ReadAsyncUnbufferedImpl(Type type) + { + var deserializer = ValidateAndMarkConsumed(type); + return ReadUnbufferedAsync(gridIndex, deserializer, cancel); + } + + private async IAsyncEnumerable ReadUnbufferedAsync(int index, Func deserializer, [EnumeratorCancellation] CancellationToken cancel) + { + try + { + while (index == gridIndex && await reader.ReadAsync(cancel).ConfigureAwait(false)) + { + yield return ConvertTo(deserializer(reader)); + } + } + finally // finally so that First etc progresses things even when multiple rows + { + if (index == gridIndex) + { + await NextResultAsync().ConfigureAwait(false); + } + } + } + + /// + /// Dispose the grid, closing and disposing both the underlying reader and command. + /// + public async ValueTask DisposeAsync() + { + if (reader != null) + { + if (!reader.IsClosed) Command?.Cancel(); + await reader.DisposeAsync(); + reader = null; + } + if (Command != null) + { + if (Command is DbCommand typed) + { + await typed.DisposeAsync(); + } + else + { + Command.Dispose(); + } + Command = null; + } + GC.SuppressFinalize(this); + } +#endif } } } diff --git a/Dapper/SqlMapper.GridReader.cs b/Dapper/SqlMapper.GridReader.cs index 7a59050f..7ecf0d7b 100644 --- a/Dapper/SqlMapper.GridReader.cs +++ b/Dapper/SqlMapper.GridReader.cs @@ -1,10 +1,10 @@ using System; using System.Collections.Generic; using System.Data; -using System.Linq; +using System.Data.Common; using System.Globalization; +using System.Linq; using System.Runtime.CompilerServices; -using System.Data.Common; namespace Dapper { diff --git a/docs/index.md b/docs/index.md index ea0af98b..e01834c5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -22,9 +22,13 @@ Note: to get the latest pre-release build, add ` -Pre` to the end of the command ### unreleased -- add support for `SqlDecimal` and other types that need to be accessed via `DbDataReader.GetFieldValue` -- add an overload of `AddTypeMap` that supports `DbDataReader.GetFieldValue` for additional types -- acknowledge that in reality we only support `DbDataReader`; this has been true (via `DbConnection`) for `async` forever +- (#1910 via mgravell, fix #1907, #1263) + - add support for `SqlDecimal` and other types that need to be accessed via `DbDataReader.GetFieldValue` + - add an overload of `AddTypeMap` that supports `DbDataReader.GetFieldValue` for additional types + - acknowledge that in reality we only support `DbDataReader`; this has been true (via `DbConnection`) for `async` forever +- (#1912 via mgravell) + - add missing `AsyncEnumerable QueryUnbufferedAsync(...)` and `GridReader.ReadUnbufferedAsync(...)` APIs (.NET 5 and later) + - implement `IAsyncDisposable` on `GridReader` (.NET 5 and later) (note: new PRs will not be merged until they add release note wording here) diff --git a/tests/Dapper.Tests/AsyncTests.cs b/tests/Dapper.Tests/AsyncTests.cs index 5d24512b..b8d1fff8 100644 --- a/tests/Dapper.Tests/AsyncTests.cs +++ b/tests/Dapper.Tests/AsyncTests.cs @@ -1,11 +1,12 @@ -using System.Linq; +using System; +using System.Collections.Generic; using System.Data; +using System.Data.Common; using System.Diagnostics; -using System; -using System.Threading.Tasks; +using System.Linq; using System.Threading; +using System.Threading.Tasks; using Xunit; -using System.Data.Common; using Xunit.Abstractions; namespace Dapper.Tests @@ -45,6 +46,85 @@ public async Task TestBasicStringUsageAsync() Assert.Equal(new[] { "abc", "def" }, arr); } +#if NET5_0_OR_GREATER + [Fact] + public async Task TestBasicStringUsageUnbufferedAsync() + { + var results = new List(); + await foreach (var value in connection.QueryUnbufferedAsync("select 'abc' as [Value] union all select @txt", new { txt = "def" }) + .ConfigureAwait(false)) + { + results.Add(value); + } + var arr = results.ToArray(); + Assert.Equal(new[] { "abc", "def" }, arr); + } + + [Fact] + public async Task TestBasicStringUsageUnbufferedAsync_Cancellation() + { + using var cts = new CancellationTokenSource(); + var results = new List(); + await Assert.ThrowsAnyAsync(async () => + { + await foreach (var value in connection.QueryUnbufferedAsync("select 'abc' as [Value] union all select @txt", new { txt = "def" }) + .ConfigureAwait(false).WithCancellation(cts.Token)) + { + results.Add(value); + cts.Cancel(); // cancel after first item + } + }); + var arr = results.ToArray(); + Assert.Equal(new[] { "abc" }, arr); // we don't expect the "def" because of the cancellation + } + + [Fact] + public async Task TestBasicStringUsageViaGridReaderUnbufferedAsync() + { + var results = new List(); + await using (var grid = await connection.QueryMultipleAsync("select 'abc' union select 'def'; select @txt", new { txt = "ghi" }) + .ConfigureAwait(false)) + { + while (!grid.IsConsumed) + { + await foreach (var value in grid.ReadUnbufferedAsync() + .ConfigureAwait(false)) + { + results.Add(value); + } + } + } + var arr = results.ToArray(); + Assert.Equal(new[] { "abc", "def", "ghi" }, arr); + } + + [Fact] + public async Task TestBasicStringUsageViaGridReaderUnbufferedAsync_Cancellation() + { + using var cts = new CancellationTokenSource(); + var results = new List(); + await using (var grid = await connection.QueryMultipleAsync("select 'abc' union select 'def'; select @txt", new { txt = "ghi" }) + .ConfigureAwait(false)) + { + await Assert.ThrowsAnyAsync(async () => + { + while (!grid.IsConsumed) + { + await foreach (var value in grid.ReadUnbufferedAsync() + .ConfigureAwait(false) + .WithCancellation(cts.Token)) + { + results.Add(value); + } + cts.Cancel(); + } + }); + } + var arr = results.ToArray(); + Assert.Equal(new[] { "abc", "def" }, arr); // don't expect the ghi because of cancellation + } +#endif + [Fact] public async Task TestBasicStringUsageQueryFirstAsync() {