Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Fixes incorrect RequestCharge and missing headers in FeedResponse for ordered cross-partition queries #2357

Merged
merged 9 commits into from
Apr 16, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
}
else
{
QueryPage page = uninitializedEnumerator.Current.Result.Page;

if (!uninitializedEnumerator.Current.Result.Enumerator.MoveNext())
{
// Page was empty
Expand All @@ -146,12 +148,12 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
documents: EmptyPage,
requestCharge: 0,
activityId: Guid.NewGuid().ToString(),
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
requestCharge: page.RequestCharge,
neildsh marked this conversation as resolved.
Show resolved Hide resolved
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
this.returnedFinalPage = true;
return true;
Expand All @@ -162,7 +164,6 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
this.enumerators.Enqueue(uninitializedEnumerator);
}

QueryPage page = uninitializedEnumerator.Current.Result.Page;
// Just return an empty page with the stats
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand Down Expand Up @@ -215,6 +216,7 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
}

(bool doneFiltering, int itemsLeftToSkip, TryCatch<OrderByQueryPage> monadicQueryByPage) = filterMonad.Result;
QueryPage page = uninitializedEnumerator.Current.Result.Page;
if (doneFiltering)
{
if (uninitializedEnumerator.Current.Result.Enumerator.Current != null)
Expand All @@ -228,12 +230,12 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
documents: EmptyPage,
requestCharge: 0,
activityId: Guid.NewGuid().ToString(),
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
requestCharge: page.RequestCharge,
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
this.returnedFinalPage = true;
return true;
Expand Down Expand Up @@ -267,7 +269,6 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
}
}

QueryPage page = uninitializedEnumerator.Current.Result.Page;
// Just return an empty page with the stats
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand Down Expand Up @@ -405,7 +406,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: currentEnumerator.Current.Result.Page.AdditionalHeaders,
j82w marked this conversation as resolved.
Show resolved Hide resolved
state: this.state));
return new ValueTask<bool>(true);
}
Expand Down Expand Up @@ -460,7 +461,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: currentEnumerator?.Current.Result.Page.AdditionalHeaders,
state: this.state));

if (state == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
QueryPage sourcePage = tryGetSourcePage.Result;
if (sourcePage.Documents.Count == 0)
{
this.cumulativeRequestCharge += sourcePage.RequestCharge;
this.cumulativeResponseLengthInBytes += sourcePage.ResponseLengthInBytes;
this.cumulativeAdditionalHeaders = sourcePage.AdditionalHeaders;
if (sourcePage.State == null)
{
QueryPage queryPage = new QueryPage(
Expand All @@ -105,6 +102,10 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return true;
}

this.cumulativeRequestCharge += sourcePage.RequestCharge;
neildsh marked this conversation as resolved.
Show resolved Hide resolved
this.cumulativeResponseLengthInBytes += sourcePage.ResponseLengthInBytes;
this.cumulativeAdditionalHeaders = sourcePage.AdditionalHeaders;

return await this.MoveNextAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
queryState = default;
}

ImmutableDictionary<string, string>.Builder additionalHeaders = ImmutableDictionary.CreateBuilder<string, string>();
additionalHeaders.Add("x-ms-documentdb-partitionkeyrangeid", "0");
additionalHeaders.Add("x-ms-test-header", "true");

return Task.FromResult(
TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand All @@ -641,7 +645,7 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
responseLengthInBytes: 1337,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: additionalHeaders.ToImmutable(),
state: queryState)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,54 @@ FROM c

QueryPage queryPage = tryGetQueryPage.Result;
documents.AddRange(queryPage.Documents);

Assert.AreNotEqual(0, queryPage.RequestCharge);
ccurrens marked this conversation as resolved.
Show resolved Hide resolved
ccurrens marked this conversation as resolved.
Show resolved Hide resolved
}

Assert.AreEqual(numItems, documents.Count);
Assert.IsTrue(documents.OrderBy(document => ((CosmosObject)document)["_ts"]).ToList().SequenceEqual(documents));
}

[TestMethod]
public async Task TestDrain_IncludesResponseHeadersInQueryPage()
{
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(10);

TryCatch<IQueryPipelineStage> monadicCreate = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: new SqlQuerySpec(@"
SELECT c._rid AS _rid, [{""item"": c._ts}] AS orderByItems, c AS payload
FROM c
WHERE {documentdb-formattableorderbyquery-filter}
ORDER BY c._ts"),
targetRanges: await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default),
partitionKey: null,
orderByColumns: new List<OrderByColumn>()
{
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
cancellationToken: default,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;

while (await queryPipelineStage.MoveNextAsync())
{
TryCatch<QueryPage> tryGetQueryPage = queryPipelineStage.Current;
if (tryGetQueryPage.Failed)
{
Assert.Fail(tryGetQueryPage.Exception.ToString());
}

QueryPage queryPage = tryGetQueryPage.Result;
Assert.IsTrue(queryPage.AdditionalHeaders.Count > 0);
}
ccurrens marked this conversation as resolved.
Show resolved Hide resolved
}

[TestMethod]
[DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")]
Expand Down