Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Fixes order by logic to throw original exception instead of AggregateException #2708

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public OrderByQueryPartitionRangePageAsyncEnumerator(
SqlQuerySpec sqlQuerySpec,
FeedRangeState<QueryState> feedRangeState,
PartitionKey? partitionKey,
QueryPaginationOptions queryPagingationOptions,
QueryPaginationOptions queryPaginationOptions,
string filter,
CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
Expand All @@ -33,7 +33,7 @@ public OrderByQueryPartitionRangePageAsyncEnumerator(
sqlQuerySpec,
feedRangeState,
partitionKey,
queryPagingationOptions,
queryPaginationOptions,
filter,
cancellationToken);
this.bufferedEnumerator = new BufferedPartitionRangePageAsyncEnumerator<OrderByQueryPage, QueryState>(
Expand Down Expand Up @@ -92,31 +92,25 @@ public InnerEnumerator(

public override ValueTask DisposeAsync() => default;

protected override Task<TryCatch<OrderByQueryPage>> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken)
protected override async Task<TryCatch<OrderByQueryPage>> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken)
{
// Unfortunately we need to keep both the epk range and partition key for queries
// Since the continuation token format uses epk range even though we only need the partition key to route the request.
FeedRangeInternal feedRange = this.PartitionKey.HasValue ? new FeedRangePartitionKey(this.PartitionKey.Value) : this.FeedRangeState.FeedRange;

return this.queryDataSource
TryCatch<QueryPage> monadicQueryPage = await this.queryDataSource
.MonadicQueryAsync(
sqlQuerySpec: this.SqlQuerySpec,
feedRangeState: new FeedRangeState<QueryState>(feedRange, this.FeedRangeState.State),
queryPaginationOptions: this.queryPaginationOptions,
trace: trace,
cancellationToken)
.ContinueWith<TryCatch<OrderByQueryPage>>(antecedent =>
{
TryCatch<QueryPage> monadicQueryPage = antecedent.Result;
if (monadicQueryPage.Failed)
{
Console.WriteLine(this.SqlQuerySpec);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

return TryCatch<OrderByQueryPage>.FromException(monadicQueryPage.Exception);
}

QueryPage queryPage = monadicQueryPage.Result;
return TryCatch<OrderByQueryPage>.FromResult(new OrderByQueryPage(queryPage));
});
cancellationToken);
if (monadicQueryPage.Failed)
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
return TryCatch<OrderByQueryPage>.FromException(monadicQueryPage.Exception);
}
QueryPage queryPage = monadicQueryPage.Result;
return TryCatch<OrderByQueryPage>.FromResult(new OrderByQueryPage(queryPage));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Threading;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -215,16 +216,18 @@ public override IAsyncEnumerable<TryCatch<ReadFeedPage>> CreateEnumerable(
cancellationToken: default));

public override IAsyncEnumerator<TryCatch<ReadFeedPage>> CreateEnumerator(
IDocumentContainer inMemoryCollection,
ReadFeedState state = null) => new BufferedPartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState>(
IDocumentContainer inMemoryCollection, ReadFeedState state = null, CancellationToken cancellationToken =default)
{
return new BufferedPartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState>(
new ReadFeedPartitionRangeEnumerator(
inMemoryCollection,
feedRangeState: new FeedRangeState<ReadFeedState>(
new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"),
state ?? ReadFeedState.Beginning()),
readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10),
cancellationToken: default),
cancellationToken: default);
cancellationToken: cancellationToken),
cancellationToken: cancellationToken);
}

private async Task BufferMoreInBackground(BufferedPartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> enumerator)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> createEnumerator(

public override IAsyncEnumerator<TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>>> CreateEnumerator(
IDocumentContainer inMemoryCollection,
CrossFeedRangeState<ReadFeedState> state = null)
CrossFeedRangeState<ReadFeedState> state = null,
CancellationToken cancellationToken = default)
{
PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> createEnumerator(
FeedRangeState<ReadFeedState> feedRangeState) => new ReadFeedPartitionRangeEnumerator(
Expand All @@ -399,7 +400,7 @@ PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> createEnumerator(
createPartitionRangeEnumerator: createEnumerator,
comparer: PartitionRangePageAsyncEnumeratorComparer.Singleton,
maxConcurrency: 10,
cancellationToken: default,
cancellationToken: cancellationToken,
state: state ?? new CrossFeedRangeState<ReadFeedState>(
new FeedRangeState<ReadFeedState>[]
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
ITrace trace,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<TryCatch<QueryPage>>(cancellationToken);
}
if (sqlQuerySpec == null)
{
throw new ArgumentNullException(nameof(sqlQuerySpec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand All @@ -20,6 +21,17 @@ protected PartitionRangeEnumeratorTests(bool singlePartition)
this.singlePartition = singlePartition;
}

[TestMethod]
public async Task TestMoveNextAsyncThrowsTaskCanceledException()
{
int numItems = 100;
IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems);
CancellationTokenSource cts = new CancellationTokenSource();
IAsyncEnumerator<TryCatch<TPage>> enumerator = this.CreateEnumerator(inMemoryCollection, cancellationToken: cts.Token);
cts.Cancel();
await Assert.ThrowsExceptionAsync<TaskCanceledException>(async () => await enumerator.MoveNextAsync());
}

[TestMethod]
public async Task TestDrainFullyAsync()
{
Expand Down Expand Up @@ -153,7 +165,7 @@ public async Task TestEmptyPages()

public abstract IAsyncEnumerable<TryCatch<TPage>> CreateEnumerable(IDocumentContainer documentContainer, TState state = null);

public abstract IAsyncEnumerator<TryCatch<TPage>> CreateEnumerator(IDocumentContainer documentContainer, TState state = null);
public abstract IAsyncEnumerator<TryCatch<TPage>> CreateEnumerator(IDocumentContainer documentContainer, TState state = null, CancellationToken cancellationToken= default);

public async Task<IDocumentContainer> CreateDocumentContainerAsync(
int numItems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Pagination
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -130,14 +131,16 @@ public override IAsyncEnumerable<TryCatch<ReadFeedPage>> CreateEnumerable(
cancellationToken: default));

public override IAsyncEnumerator<TryCatch<ReadFeedPage>> CreateEnumerator(
IDocumentContainer inMemoryCollection,
ReadFeedState state = null) => new ReadFeedPartitionRangeEnumerator(
IDocumentContainer inMemoryCollection, ReadFeedState state = null, CancellationToken cancellationToken = default)
{
return new ReadFeedPartitionRangeEnumerator(
inMemoryCollection,
feedRangeState: new FeedRangeState<ReadFeedState>(
new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"),
state ?? ReadFeedState.Beginning()),
readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10),
cancellationToken: default);
cancellationToken: cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace Microsoft.Azure.Cosmos.Tests.Query
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Cosmos.Pagination;
using Cosmos.Query.Core.Monads;
using Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy;
using Cosmos.Query.Core.Pipeline.Pagination;
using Cosmos.Tracing;
using Pagination;
using VisualStudio.TestTools.UnitTesting;

[TestClass]
public class OrderByQueryPartitionRangePageAsyncEnumeratorTests
{
[TestMethod]
public async Task TestMoveNextAsyncThrowsTaskCanceledException()
{
Implementation implementation = new Implementation();
await implementation.TestMoveNextAsyncThrowsTaskCanceledException();
}

[TestClass]
private sealed class Implementation : PartitionRangeEnumeratorTests<OrderByQueryPage, QueryState>
{
public Implementation()
: base(singlePartition: true)
{
}

public override IReadOnlyList<Record> GetRecordsFromPage(OrderByQueryPage page)
{
throw new NotImplementedException();
}

public override IAsyncEnumerable<TryCatch<OrderByQueryPage>> CreateEnumerable(IDocumentContainer documentContainer, QueryState state = null)
{
throw new NotImplementedException();
}

public override IAsyncEnumerator<TryCatch<OrderByQueryPage>> CreateEnumerator(
IDocumentContainer documentContainer, QueryState state = null, CancellationToken cancellationToken = default)
{
List<FeedRangeEpk> ranges = documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: cancellationToken).Result;
Assert.AreEqual(1, ranges.Count);
return new OrderByQueryPartitionRangePageAsyncEnumerator(
queryDataSource: documentContainer,
sqlQuerySpec: new Cosmos.Query.Core.SqlQuerySpec("SELECT * FROM c"),
feedRangeState: new FeedRangeState<QueryState>(ranges[0], state),
partitionKey: null,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
filter: "filter",
cancellationToken: cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand Down Expand Up @@ -145,7 +146,8 @@ public override IAsyncEnumerable<TryCatch<QueryPage>> CreateEnumerable(

public override IAsyncEnumerator<TryCatch<QueryPage>> CreateEnumerator(
IDocumentContainer documentContainer,
QueryState state = default)
QueryState state = default,
CancellationToken cancellationToken = default)
{
List<FeedRangeEpk> ranges = documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
Expand All @@ -157,7 +159,7 @@ public override IAsyncEnumerator<TryCatch<QueryPage>> CreateEnumerator(
feedRangeState: new FeedRangeState<QueryState>(ranges[0], state),
partitionKey: null,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
cancellationToken: default);
cancellationToken: cancellationToken);
}
}
}
Expand Down