From 9faa445802356aaba8001d76a6c12e6dfb8c03cc Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Thu, 16 May 2024 17:48:21 -0700 Subject: [PATCH] incorporate code review feedback --- ...OrderByCrossPartitionQueryPipelineStage.cs | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index 0adbe3029e..89bed7195c 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -77,7 +77,8 @@ public InitializationParameters( private enum ExecutionState { Uninitialized, - Initialized + Initialized, + Done } public static TryCatch MonadicCreate( @@ -1673,15 +1674,12 @@ private sealed class NonStreamingOrderByPipelineStage : IQueryPipelineStage private BufferedOrderByResults bufferedResults; - private bool firstPage; - public TryCatch Current { get; private set; } private NonStreamingOrderByPipelineStage(InitializationParameters parameters, int pageSize) { this.parameters = parameters ?? throw new ArgumentNullException(nameof(parameters)); this.pageSize = pageSize; - this.firstPage = true; this.executionState = ExecutionState.Uninitialized; } @@ -1693,11 +1691,19 @@ public ValueTask DisposeAsync() public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) { + if (this.executionState == ExecutionState.Done) + { + return false; + } + cancellationToken.ThrowIfCancellationRequested(); + bool firstPage = false; if (this.executionState == ExecutionState.Uninitialized) { - await this.MoveNextAsync_InitializeAsync(trace, cancellationToken); + firstPage = true; + this.bufferedResults = await this.MoveNextAsync_InitializeAsync(trace, cancellationToken); + this.executionState = ExecutionState.Initialized; } List documents = new List(this.pageSize); @@ -1706,9 +1712,9 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance documents.Add(this.bufferedResults.Enumerator.Current.Payload); } - if (this.firstPage || documents.Count > 0) + if (firstPage || documents.Count > 0) { - double requestCharge = this.firstPage ? this.bufferedResults.TotalRequestCharge : 0; + double requestCharge = firstPage ? this.bufferedResults.TotalRequestCharge : 0; QueryPage queryPage = new QueryPage( documents: documents, requestCharge: requestCharge, @@ -1720,17 +1726,17 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance state: documents.Count > 0 ? NonStreamingOrderByInProgress : null, streaming: false); - this.firstPage = false; this.Current = TryCatch.FromResult(queryPage); return true; } else { + this.executionState = ExecutionState.Done; return false; } } - private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken) + private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken) { ITracingAsyncEnumerator> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync( this.parameters.DocumentContainer, @@ -1752,8 +1758,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken trace, cancellationToken); - this.bufferedResults = bufferedResults; - this.executionState = ExecutionState.Initialized; + return bufferedResults; } public static IQueryPipelineStage Create( @@ -1817,7 +1822,7 @@ public static async Task>> Cr OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, sqlQuerySpec, - new FeedRangeState(range, null), + new FeedRangeState(range, state: null), partitionKey, queryPaginationOptions, filter: null, @@ -1853,6 +1858,7 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance { while (this.enumeratorsAndTokens.Count > 0) { + cancellationToken.ThrowIfCancellationRequested(); (OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token) = this.enumeratorsAndTokens.Dequeue(); if (await enumerator.MoveNextAsync(trace, cancellationToken)) { @@ -1861,7 +1867,7 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance OrderByContinuationToken continuationToken; if (enumerator.Current.Result.Page.Documents.Count > 0) { - // Use the token for the next page, since we fully drained the enumerator. + // Use the token for the next page, since we fully drained the page. continuationToken = enumerator.FeedRangeState.State?.Value != null ? CreateOrderByContinuationToken( new ParallelContinuationToken(