From 533c46fd701bcb552fb86494462a8f6116157182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Wed, 29 Sep 2021 14:20:47 +0200 Subject: [PATCH] feat(linq): add support for Async queries (#239) --- CHANGELOG.md | 4 + Client.Linq.Test/Client.Linq.Test.csproj | 1 + Client.Linq.Test/ItInfluxDBQueryableTest.cs | 138 +++++++++++++++----- Client.Linq/Client.Linq.csproj | 1 + Client.Linq/InfluxDBQueryable.cs | 97 +++++++++++++- Client.Linq/Internal/QueryExecutor.cs | 86 ++++++++++-- Client.Linq/README.md | 31 +++-- Client.Test/QueryApiTest.cs | 26 +++- Client/QueryApi.cs | 19 +++ 9 files changed, 339 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91c2c114e..3a7570af3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 3.1.0 [unreleased] +### Features +1. [#239](https://github.com/influxdata/influxdb-client-csharp/pull/239): Add support for Asynchronous queries [LINQ] + ## 3.0.0 [2021-09-17] ### Breaking Changes @@ -8,6 +11,7 @@ Adds a `Type` overload for POCOs to `QueryAsync`. This will add `object ConvertT ### Features 1. [#232](https://github.com/influxdata/influxdb-client-csharp/pull/232): Add a `Type` overload for POCOs to `QueryAsync`. 1. [#233](https://github.com/influxdata/influxdb-client-csharp/pull/233): Add possibility to follow HTTP redirects +1. [#239](https://github.com/influxdata/influxdb-client-csharp/pull/239): Add support for Asynchronous queries [LINQ] ### Bug Fixes 1. [#236](https://github.com/influxdata/influxdb-client-csharp/pull/236): Mapping `long` type into Flux AST [LINQ] diff --git a/Client.Linq.Test/Client.Linq.Test.csproj b/Client.Linq.Test/Client.Linq.Test.csproj index 7a8dd0ce4..1864fb22e 100644 --- a/Client.Linq.Test/Client.Linq.Test.csproj +++ b/Client.Linq.Test/Client.Linq.Test.csproj @@ -14,6 +14,7 @@ + diff --git a/Client.Linq.Test/ItInfluxDBQueryableTest.cs b/Client.Linq.Test/ItInfluxDBQueryableTest.cs index 0368e61c7..657c17c7d 100644 --- a/Client.Linq.Test/ItInfluxDBQueryableTest.cs +++ b/Client.Linq.Test/ItInfluxDBQueryableTest.cs @@ -36,7 +36,7 @@ public class ItInfluxDBQueryableTest : AbstractTest await _client .GetWriteApiAsync() - .WriteRecordsAsync("my-bucket", "my-org", WritePrecision.S, + .WriteRecordsAsync("my-bucket", "my-org", WritePrecision.S, sensor11, sensor21, sensor12, sensor22, sensor13, sensor23, sensor14, sensor24); } @@ -50,17 +50,17 @@ public void QueryAll() Assert.AreEqual(8, sensors.Count); } - + [Test] public void QueryExample() { var query = (from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync()) - where s.SensorId == "id-1" - where s.Value > 12 - where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) - where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) - orderby s.Timestamp - select s) + where s.SensorId == "id-1" + where s.Value > 12 + where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) + where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) + orderby s.Timestamp + select s) .Take(2) .Skip(2); @@ -68,17 +68,17 @@ orderby s.Timestamp Assert.AreEqual(1, sensors.Count); } - + [Test] public void QueryExampleCount() { var query = (from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync()) - where s.SensorId == "id-1" - where s.Value > 12 - where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) - where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) - orderby s.Timestamp - select s) + where s.SensorId == "id-1" + where s.Value > 12 + where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) + where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) + orderby s.Timestamp + select s) .Count(); Assert.AreEqual(3, query); @@ -92,14 +92,14 @@ public void QueryTake() var sensors = query.ToList(); - Assert.AreEqual(2*2, sensors.Count); + Assert.AreEqual(2 * 2, sensors.Count); } [Test] public void QueryTakeMultipleTimeSeries() { var query = (from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync(), - new QueryableOptimizerSettings {QueryMultipleTimeSeries = true}) + new QueryableOptimizerSettings { QueryMultipleTimeSeries = true }) select s).Take(2); var sensors = query.ToList(); @@ -115,9 +115,9 @@ public void QueryTakeSkip() var sensors = query.ToList(); - Assert.AreEqual(1+1, sensors.Count); + Assert.AreEqual(1 + 1, sensors.Count); } - + [Test] public void QueryWhereEqual() { @@ -133,7 +133,7 @@ public void QueryWhereEqual() Assert.AreEqual("id-1", sensor.SensorId); } } - + [Test] public void QueryWhereNotEqual() { @@ -229,7 +229,7 @@ public void QueryAnd() Assert.GreaterOrEqual(sensor.Value, 28); } } - + [Test] public void QueryOr() { @@ -241,7 +241,7 @@ public void QueryOr() Assert.AreEqual(6, sensors.Count); } - + [Test] public void QueryTimeRange() { @@ -257,7 +257,7 @@ public void QueryTimeRange() Assert.GreaterOrEqual(sensor.Value, 89); } } - + [Test] public void QueryTimeGreaterEqual() { @@ -269,7 +269,7 @@ public void QueryTimeGreaterEqual() Assert.AreEqual(4, sensors.Count); } - + [Test] public void QueryTimeEqual() { @@ -285,7 +285,7 @@ public void QueryTimeEqual() Assert.GreaterOrEqual(sensor.Value, 15); } } - + [Test] public void QueryWhereNothing() { @@ -297,7 +297,7 @@ public void QueryWhereNothing() Assert.AreEqual(0, sensors.Count); } - + [Test] public void QueryOrderBy() { @@ -310,19 +310,19 @@ orderby s.Value Assert.AreEqual(12, sensors.First().Value); Assert.AreEqual(89, sensors.Last().Value); } - + [Test] public void QueryOrderByTime() { var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync()) - orderby s.Timestamp descending + orderby s.Timestamp descending select s; var sensors = query.ToList(); - Assert.AreEqual(new DateTime(2020, 11, 17, 8, 20, 15, DateTimeKind.Utc), + Assert.AreEqual(new DateTime(2020, 11, 17, 8, 20, 15, DateTimeKind.Utc), sensors.First().Timestamp); - Assert.AreEqual(new DateTime(2020, 10, 15, 8, 20, 15, DateTimeKind.Utc), + Assert.AreEqual(new DateTime(2020, 10, 15, 8, 20, 15, DateTimeKind.Utc), sensors.Last().Timestamp); } @@ -349,6 +349,84 @@ public void QueryCountDifferentTimeSeries() Assert.AreEqual(8, sensors); } + [Test] + public void SyncQueryConfiguration() + { + var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApi()) + select s; + + var ae = Assert.Throws(() => query.ToList()); + Assert.AreEqual("The 'QueryApiSync' has to be configured for sync queries.", ae.Message); + } + + [Test] + public void ASyncQueryConfiguration() + { + var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync()) + select s; + + var ae = Assert.Throws(() => query.ToInfluxQueryable().GetAsyncEnumerator()); + Assert.AreEqual("The 'QueryApi' has to be configured for Async queries.", ae.Message); + } + + [Test] + public async Task ASyncQuery() + { + var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApi()) + select s; + + var sensors = await query + .ToInfluxQueryable() + .GetAsyncEnumerator() + .ToListAsync(); + + Assert.AreEqual(8, sensors.Count); + } + + [Test] + public async Task ASyncQueryFirst() + { + var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApi()) + select s; + + var sensor = await query + .ToInfluxQueryable() + .GetAsyncEnumerator() + .FirstOrDefaultAsync(); + + Assert.IsNotNull(sensor); + } + + [Test] + public void AggregateFunction() + { + var count = (from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApiSync()) + where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) + where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) + orderby s.Timestamp + select s) + .Count(); + + Assert.AreEqual(8, count); + } + + [Test] + public async Task AggregateFunctionAsync() + { + var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", _client.GetQueryApi()) + where s.Timestamp > new DateTime(2019, 11, 16, 8, 20, 15, DateTimeKind.Utc) + where s.Timestamp < new DateTime(2021, 01, 10, 5, 10, 0, DateTimeKind.Utc) + orderby s.Timestamp + select s; + + var count = await query + .ToInfluxQueryable() + .GetAsyncEnumerator() + .CountAsync(); + + Assert.AreEqual(8, count); + } + [TearDown] protected void After() { diff --git a/Client.Linq/Client.Linq.csproj b/Client.Linq/Client.Linq.csproj index 2aff66748..5e7e3094f 100644 --- a/Client.Linq/Client.Linq.csproj +++ b/Client.Linq/Client.Linq.csproj @@ -8,6 +8,7 @@ InfluxDB.Client.Linq 3.1.0 dev + 8 InfluxDB.Client.Linq influxdata;timeseries;flux;influxdb;linq diff --git a/Client.Linq/InfluxDBQueryable.cs b/Client.Linq/InfluxDBQueryable.cs index fa3ee86c8..c4d604be6 100644 --- a/Client.Linq/InfluxDBQueryable.cs +++ b/Client.Linq/InfluxDBQueryable.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; +using System.Threading; using InfluxDB.Client.Core; using InfluxDB.Client.Linq.Internal; using Remotion.Linq; @@ -17,7 +19,7 @@ public QueryableOptimizerSettings() { QueryMultipleTimeSeries = false; } - + /// /// Gets or sets whether the drive is used to query multiple time series. /// Setting this variable to true will change how the produced Flux Query looks like: @@ -35,7 +37,7 @@ public QueryableOptimizerSettings() public class InfluxDBQueryable : QueryableBase { /// - /// Create a new instance of IQueryable. + /// Create a new instance of IQueryable for synchronous Queries. /// /// Specifies the source bucket. /// Specifies the source organization. @@ -47,9 +49,23 @@ public static InfluxDBQueryable Queryable(string bucket, string org, QueryApi { return Queryable(bucket, org, queryApi, new DefaultMemberNameResolver(), queryableOptimizerSettings); } + + /// + /// Create a new instance of IQueryable for asynchronous Queries. + /// + /// Specifies the source bucket. + /// Specifies the source organization. + /// The underlying API to execute Flux Query. + /// Settings for a Query optimization + /// new instance for of Queryable + public static InfluxDBQueryable Queryable(string bucket, string org, QueryApi queryApi, + QueryableOptimizerSettings queryableOptimizerSettings = default) + { + return Queryable(bucket, org, queryApi, new DefaultMemberNameResolver(), queryableOptimizerSettings); + } /// - /// Create a new instance of IQueryable. + /// Create a new instance of IQueryable for synchronous Queries. /// /// Specifies the source bucket. /// Specifies the source organization. @@ -64,7 +80,22 @@ public static InfluxDBQueryable Queryable(string bucket, string org, QueryApi } /// - /// Create a new instance of IQueryable. + /// Create a new instance of IQueryable for asynchronous Queries. + /// + /// Specifies the source bucket. + /// Specifies the source organization. + /// The underlying API to execute Flux Query. + /// Resolver for customized names. + /// Settings for a Query optimization + /// new instance for of Queryable + public static InfluxDBQueryable Queryable(string bucket, string org, QueryApi queryApi, + IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings = default) + { + return new InfluxDBQueryable(bucket, org, queryApi, memberResolver, queryableOptimizerSettings); + } + + /// + /// Create a new instance of IQueryable for synchronous Queries. /// /// Specifies the source bucket. /// Specifies the source organization. @@ -77,6 +108,20 @@ public InfluxDBQueryable(string bucket, string org, QueryApiSync queryApi, IMemb { } + /// + /// Create a new instance of IQueryable for asynchronous Queries. + /// + /// Specifies the source bucket. + /// Specifies the source organization. + /// The underlying API to execute Flux Query. + /// Resolver for customized names. + /// Settings for a Query optimization + public InfluxDBQueryable(string bucket, string org, QueryApi queryApi, IMemberNameResolver memberResolver, + QueryableOptimizerSettings queryableOptimizerSettings = default) : base(CreateQueryParser(), + CreateExecutor(bucket, org, queryApi, memberResolver, queryableOptimizerSettings)) + { + } + /// /// Call by ReLinq. /// @@ -94,7 +139,7 @@ public Api.Domain.Query ToDebugQuery() { var provider = Provider as DefaultQueryProvider; var executor = provider?.Executor as InfluxDBQueryExecutor; - + if (executor == null) throw new NotSupportedException("InfluxDBQueryable should use InfluxDBQueryExecutor"); @@ -115,9 +160,49 @@ private static IQueryExecutor CreateExecutor(string bucket, string org, QueryApi queryableOptimizerSettings ?? new QueryableOptimizerSettings()); } + private static IQueryExecutor CreateExecutor(string bucket, string org, QueryApi queryApi, + IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings = default) + { + Arguments.CheckNonEmptyString(bucket, nameof(bucket)); + Arguments.CheckNonEmptyString(org, nameof(org)); + Arguments.CheckNotNull(queryApi, nameof(queryApi)); + + return new InfluxDBQueryExecutor(bucket, org, queryApi, memberResolver, + queryableOptimizerSettings ?? new QueryableOptimizerSettings()); + } + private static QueryParser CreateQueryParser() { return QueryParser.CreateDefault(); } + + public IAsyncEnumerable GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + var provider = Provider as DefaultQueryProvider; + + if (!(provider?.Executor is InfluxDBQueryExecutor executor)) + { + throw new NotSupportedException("InfluxDBQueryable should use InfluxDBQueryExecutor"); + } + + var parsedQuery = provider.QueryParser.GetParsedQuery(Expression); + return executor.ExecuteCollectionAsync(parsedQuery, cancellationToken); + } + } + + public static class QueryableExtensions + { + public static InfluxDBQueryable ToInfluxQueryable(this IQueryable source) + { + if (source == null) + { + throw new InvalidCastException("Queryable source is null"); + } + + if (!(source is InfluxDBQueryable queryable)) + throw new InvalidCastException("Queryable should be InfluxDBQueryable"); + + return queryable; + } } -} +} \ No newline at end of file diff --git a/Client.Linq/Internal/QueryExecutor.cs b/Client.Linq/Internal/QueryExecutor.cs index 7abc1623a..12cd073af 100644 --- a/Client.Linq/Internal/QueryExecutor.cs +++ b/Client.Linq/Internal/QueryExecutor.cs @@ -2,7 +2,10 @@ using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; +using InfluxDB.Client.Core.Flux.Domain; using Remotion.Linq; [assembly: InternalsVisibleTo("Client.Linq.Test, PublicKey=002400000480000094000000060200000024000052534131" + @@ -10,6 +13,7 @@ "95804a1aeeb0de18ac3728782f9dc8dbae2e806167a8bb64c0402278edcefd78c13dbe7f8d13de36eb362" + "21ec215c66ee2dfe7943de97b869c5eea4d92f92d345ced67de5ac8fc3cd2f8dd7e3c0c53bdb0cc433af8" + "59033d069cad397a7")] + namespace InfluxDB.Client.Linq.Internal { /// @@ -19,12 +23,13 @@ internal class InfluxDBQueryExecutor : IQueryExecutor { private readonly string _bucket; private readonly string _org; - private readonly QueryApiSync _queryApi; + private readonly QueryApiSync _queryApiSync; + private readonly QueryApi _queryApi; private readonly IMemberNameResolver _memberResolver; private readonly QueryableOptimizerSettings _queryableOptimizerSettings; /// - /// + /// Create InfluxDBQuery Executor for synchronous Queries. /// /// Specifies the source bucket. /// Specifies the source organization. @@ -33,6 +38,24 @@ internal class InfluxDBQueryExecutor : IQueryExecutor /// Settings for a Query optimization public InfluxDBQueryExecutor(string bucket, string org, QueryApiSync queryApi, IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings) + { + _bucket = bucket; + _org = org; + _queryApiSync = queryApi; + _memberResolver = memberResolver; + _queryableOptimizerSettings = queryableOptimizerSettings; + } + + /// + /// Create InfluxDBQuery Executor for asynchronous Queries. + /// + /// Specifies the source bucket. + /// Specifies the source organization. + /// The underlying API to execute Flux Query. + /// Resolver for customized names. + /// Settings for a Query optimization + public InfluxDBQueryExecutor(string bucket, string org, QueryApi queryApi, + IMemberNameResolver memberResolver, QueryableOptimizerSettings queryableOptimizerSettings) { _bucket = bucket; _org = org; @@ -68,18 +91,40 @@ public IEnumerable ExecuteCollection(QueryModel queryModel) { var query = GenerateQuery(queryModel, out var queryResultsSettings); + if (_queryApiSync == null) + { + throw new ArgumentException("The 'QueryApiSync' has to be configured for sync queries."); + } + if (queryResultsSettings.ScalarAggregated) { - var enumerable = _queryApi.QuerySync(query, _org) - .SelectMany(it => it.Records) - .Select(it => it.GetValueByKey("linq_result_column")); - - var result = queryResultsSettings.AggregateFunction(enumerable); + var result = ApplyAggregate(_queryApiSync.QuerySync(query, _org), queryResultsSettings); - return new List {(T) Convert.ChangeType(result, typeof(T))}; + return new List { result }; } - - return _queryApi.QuerySync(query, _org); + + return _queryApiSync.QuerySync(query, _org); + } + + /// + /// Executes an async query with a collection result. + /// + public IAsyncEnumerable ExecuteCollectionAsync(QueryModel queryModel, + CancellationToken cancellationToken = new CancellationToken()) + { + var query = GenerateQuery(queryModel, out var queryResultsSettings); + + if (_queryApi == null) + { + throw new ArgumentException("The 'QueryApi' has to be configured for Async queries."); + } + + if (queryResultsSettings.ScalarAggregated) + { + return AggregateAsync(_queryApi.QueryAsync(query, _org), queryResultsSettings, cancellationToken); + } + + return _queryApi.QueryAsyncEnumerable(query, _org, cancellationToken); } /// @@ -107,5 +152,26 @@ internal InfluxDBQueryVisitor QueryVisitor(QueryModel queryModel) visitor.VisitQueryModel(queryModel); return visitor; } + + private async IAsyncEnumerable AggregateAsync(Task> tables, + QueryResultsSettings queryResultsSettings, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var result = tables + .ContinueWith(t => ApplyAggregate(t.Result, queryResultsSettings), cancellationToken); + + yield return await result.ConfigureAwait(false); + } + + private static T ApplyAggregate(IEnumerable tables, QueryResultsSettings queryResultsSettings) + { + var enumerable = tables + .SelectMany(it => it.Records) + .Select(it => it.GetValueByKey("linq_result_column")); + + var aggregated = queryResultsSettings.AggregateFunction(enumerable); + + return (T)Convert.ChangeType(aggregated, typeof(T)); + } } } \ No newline at end of file diff --git a/Client.Linq/README.md b/Client.Linq/README.md index 8538c4c2c..ba5e99c94 100644 --- a/Client.Linq/README.md +++ b/Client.Linq/README.md @@ -13,7 +13,6 @@ This section contains links to the client library documentation. ## Usage -- [Changelog](#changelog) - [How to start](#how-to-start) - [Time Series](#time-series) - [Enable querying multiple time-series](#enable-querying-multiple-time-series) @@ -39,19 +38,7 @@ This section contains links to the client library documentation. - [LongCount](#longcount) - [Domain Converter](#domain-converter) - [How to debug output Flux Query](#how-to-debug-output-flux-query) - -## Changelog -### 1.19.0-dev.3160 [2021-06-02] - - Fix Flux AST for Tag parameters which are not `String`. See details - [#202](https://github.com/influxdata/influxdb-client-csharp/pull/202) -### 1.19.0-dev.3084 [2021-05-07] - - optimize Flux Query for querying one time-series. See details - [#197](https://github.com/influxdata/influxdb-client-csharp/pull/197) -### 1.18.0-dev.2973 [2021-04-27] - - switch `pivot()` and `drop()` function to achieve better performance. See details - [#188](https://github.com/influxdata/influxdb-client-csharp/pull/188) -### 1.18.0-dev.2880 [2021-04-12] - - use `group()` function in output Flux query. See details - [Group function](#enable-querying-multiple-time-series) -### 1.17.0-dev.linq.17 [2021-03-18] - - Optimize filtering by tag - [see more](#filtering) - - rebased with `master` branch +- [Asynchronous Queries](#asynchronous-queries) ## How to start @@ -1009,3 +996,19 @@ foreach (var statement in influxQuery.Extern.Body) Console.WriteLine(); Console.WriteLine(influxQuery._Query); ``` + +## Asynchronous Queries + +The LINQ driver also supports asynchronous querying. For asynchronous queries you have to initialize `InfluxDBQueryable` with asynchronous version of [QueryApi](/Client/QueryApi.cs) and transform `IQueryable` to `IAsyncEnumerable`: + +```c# +var client = InfluxDBClientFactory.Create("http://localhost:8086", "my-token"); +var queryApi = client.GetQueryApi(); + +var query = from s in InfluxDBQueryable.Queryable("my-bucket", "my-org", queryApi) + select s; + +IAsyncEnumerable enumerable = query + .ToInfluxQueryable() + .GetAsyncEnumerator(); +``` \ No newline at end of file diff --git a/Client.Test/QueryApiTest.cs b/Client.Test/QueryApiTest.cs index 8733287b3..dcdfef08c 100644 --- a/Client.Test/QueryApiTest.cs +++ b/Client.Test/QueryApiTest.cs @@ -2,7 +2,9 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; +using InfluxDB.Client.Api.Domain; using InfluxDB.Client.Core; using InfluxDB.Client.Core.Flux.Domain; using InfluxDB.Client.Core.Test; @@ -68,7 +70,6 @@ public async Task GenericAndTypeofCalls() .Given(Request.Create().WithPath("/api/v2/query").UsingPost()) .RespondWith(CreateResponse(Data)); - var measurements = await _queryApi.QueryAsync("from(..."); var measurementsTypeof = await _queryApi.QueryAsync("from(...",typeof(SyncPoco)); @@ -82,9 +83,26 @@ public async Task GenericAndTypeofCalls() Assert.AreEqual(12.25, cast[0].Value); Assert.AreEqual(13.00, cast[1].Value); } - - - + + [Test] + public async Task QueryAsyncEnumerable() + { + MockServer + .Given(Request.Create().WithPath("/api/v2/query").UsingPost()) + .RespondWith(CreateResponse(Data)); + + var measurements = _queryApi.QueryAsyncEnumerable( + new Query(null, "from(...)"), + "my-org", new CancellationToken()); + + var list = new List(); + await foreach (var item in measurements.ConfigureAwait(false)) + { + list.Add(item); + } + + Assert.AreEqual(2, list.Count); + } private class SyncPoco { diff --git a/Client/QueryApi.cs b/Client/QueryApi.cs index 9bac6e2de..ebe980e4e 100644 --- a/Client/QueryApi.cs +++ b/Client/QueryApi.cs @@ -185,6 +185,25 @@ public async IAsyncEnumerable QueryAsyncEnumerable(string query, [Enumerat await foreach (var record in QueryEnumerable(requestMessage, cancellationToken).ConfigureAwait(false)) yield return record; } + + /// + /// Executes the Flux query against the InfluxDB 2.0 and asynchronously maps + /// response to enumerable of objects of type . + /// + /// the flux query to execute + /// specifies the source organization + /// cancellation token + /// the type of measurement + /// Measurements which are matched the query + public async IAsyncEnumerable QueryAsyncEnumerable(Query query, string org, [EnumeratorCancellation] CancellationToken cancellationToken) + { + Arguments.CheckNotNull(query, nameof(query)); + + var requestMessage = CreateRequest(query, org); + + await foreach (var record in QueryEnumerable(requestMessage, cancellationToken).ConfigureAwait(false)) + yield return record; + } /// /// Executes the Flux query against the InfluxDB 2.0 and asynchronously maps