Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Fixes incorrect RequestCharge and missing headers in FeedResponse for ordered cross-partition queries #2357

Merged
merged 9 commits into from
Apr 16, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
}
else
{
QueryPage page = uninitializedEnumerator.Current.Result.Page;

if (!uninitializedEnumerator.Current.Result.Enumerator.MoveNext())
{
// Page was empty
Expand All @@ -146,12 +148,12 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
documents: EmptyPage,
requestCharge: 0,
activityId: Guid.NewGuid().ToString(),
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
requestCharge: page.RequestCharge,
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
this.returnedFinalPage = true;
return true;
Expand All @@ -162,7 +164,6 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FromBeginningAsync(
this.enumerators.Enqueue(uninitializedEnumerator);
}

QueryPage page = uninitializedEnumerator.Current.Result.Page;
// Just return an empty page with the stats
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand Down Expand Up @@ -215,6 +216,7 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
}

(bool doneFiltering, int itemsLeftToSkip, TryCatch<OrderByQueryPage> monadicQueryByPage) = filterMonad.Result;
QueryPage page = uninitializedEnumerator.Current.Result.Page;
if (doneFiltering)
{
if (uninitializedEnumerator.Current.Result.Enumerator.Current != null)
Expand All @@ -228,12 +230,12 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
documents: EmptyPage,
requestCharge: 0,
activityId: Guid.NewGuid().ToString(),
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
requestCharge: page.RequestCharge,
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
this.returnedFinalPage = true;
return true;
Expand Down Expand Up @@ -267,7 +269,6 @@ private async ValueTask<bool> MoveNextAsync_Initialize_FilterAsync(
}
}

QueryPage page = uninitializedEnumerator.Current.Result.Page;
// Just return an empty page with the stats
this.Current = TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand Down Expand Up @@ -405,7 +406,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: currentEnumerator.Current.Result.Page.AdditionalHeaders,
state: this.state));
return new ValueTask<bool>(true);
}
Expand Down Expand Up @@ -460,7 +461,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: currentEnumerator?.Current.Result.Page.AdditionalHeaders,
state: this.state));

if (state == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
QueryPage sourcePage = tryGetSourcePage.Result;
if (sourcePage.Documents.Count == 0)
{
this.cumulativeRequestCharge += sourcePage.RequestCharge;
this.cumulativeResponseLengthInBytes += sourcePage.ResponseLengthInBytes;
this.cumulativeAdditionalHeaders = sourcePage.AdditionalHeaders;
if (sourcePage.State == null)
{
QueryPage queryPage = new QueryPage(
Expand All @@ -105,6 +102,10 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return true;
}

this.cumulativeRequestCharge += sourcePage.RequestCharge;
this.cumulativeResponseLengthInBytes += sourcePage.ResponseLengthInBytes;
this.cumulativeAdditionalHeaders = sourcePage.AdditionalHeaders;

return await this.MoveNextAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,5 +1408,91 @@ And comparison on arrays and objects is undefined.
}
}
}

class OrderByRequestChargeArgs
{
public string Query { get; set; }
public double ExpectedRequestCharge { get; set; }
}

[TestMethod]
public async Task TestQueryCrossPartitionRequestChargesAsync()
{
string[] documents = new[]
{
@"{""id"":""documentId1"",""key"":""A""}",
@"{""id"":""documentId2"",""key"":""A"",""prop"":3}",
@"{""id"":""documentId3"",""key"":""A""}",
@"{""id"":""documentId4"",""key"":5}",
@"{""id"":""documentId5"",""key"":5,""prop"":2}",
@"{""id"":""documentId6"",""key"":5}",
@"{""id"":""documentId7"",""key"":2}",
@"{""id"":""documentId8"",""key"":2,""prop"":1}",
@"{""id"":""documentId9"",""key"":2}",
};

// Matches no documents
await this.CreateIngestQueryDeleteAsync<OrderByRequestChargeArgs>(
ConnectionModes.Gateway,
CollectionTypes.MultiPartition,
documents,
this.TestQueryCrossPartitionRequestChargesHelper,
new OrderByRequestChargeArgs
{
Query = "SELECT r.id FROM r WHERE r.prop = 'A' ORDER BY r.prop DESC",
ExpectedRequestCharge = 13.95
},
"/key");

// Matches some documents
await this.CreateIngestQueryDeleteAsync<OrderByRequestChargeArgs>(
ConnectionModes.Direct,
CollectionTypes.MultiPartition,
documents,
this.TestQueryCrossPartitionRequestChargesHelper,
new OrderByRequestChargeArgs
{
Query = "SELECT r.id FROM r ORDER BY r.prop DESC",
ExpectedRequestCharge = 16.86
},
"/key");

// Matches some documents, skipped with OFFSET LIMIT
await this.CreateIngestQueryDeleteAsync<OrderByRequestChargeArgs>(
ConnectionModes.Direct,
CollectionTypes.MultiPartition,
documents,
this.TestQueryCrossPartitionRequestChargesHelper,
new OrderByRequestChargeArgs
{
Query = "SELECT r.id FROM r ORDER BY r.prop DESC OFFSET 10 LIMIT 1",
ExpectedRequestCharge = 16.86
},
"/key");
}

private async Task TestQueryCrossPartitionRequestChargesHelper(
Container container,
IReadOnlyList<CosmosObject> documents,
OrderByRequestChargeArgs args)
{
await QueryTestsBase.NoOp();

double totalRUs = 0;
await foreach (FeedResponse<CosmosElement> query in QueryTestsBase.RunSimpleQueryAsync<CosmosElement>(
container,
args.Query,
new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1,
}))
{
totalRUs += query.RequestCharge;
}

Assert.AreEqual(args.ExpectedRequestCharge, totalRUs, 0.01);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,23 @@ internal static async Task<List<CosmosElement>> RunQueryCombinationsAsync(
return queryExecutionResults.Values.First();
}

internal static async IAsyncEnumerable<FeedResponse<T>> RunSimpleQueryAsync<T>(
Container container,
string query,
QueryRequestOptions requestOptions = null)
{
using (FeedIterator<T> resultSetIterator = container.GetItemQueryIterator<T>(
query,
requestOptions: requestOptions))
{
while (resultSetIterator.HasMoreResults)
{
FeedResponse<T> response = await resultSetIterator.ReadNextAsync();
yield return response;
}
}
}

internal async Task<List<T>> RunSinglePartitionQuery<T>(
Container container,
string query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
queryState = default;
}

ImmutableDictionary<string, string>.Builder additionalHeaders = ImmutableDictionary.CreateBuilder<string, string>();
additionalHeaders.Add("x-ms-documentdb-partitionkeyrangeid", "0");
additionalHeaders.Add("x-ms-test-header", "true");

return Task.FromResult(
TryCatch<QueryPage>.FromResult(
new QueryPage(
Expand All @@ -641,7 +645,7 @@ public Task<TryCatch<QueryPage>> MonadicQueryAsync(
responseLengthInBytes: 1337,
cosmosQueryExecutionInfo: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
additionalHeaders: additionalHeaders.ToImmutable(),
state: queryState)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,54 @@ FROM c

QueryPage queryPage = tryGetQueryPage.Result;
documents.AddRange(queryPage.Documents);

Assert.AreEqual(42, queryPage.RequestCharge);
}

Assert.AreEqual(numItems, documents.Count);
Assert.IsTrue(documents.OrderBy(document => ((CosmosObject)document)["_ts"]).ToList().SequenceEqual(documents));
}

[TestMethod]
public async Task TestDrain_IncludesResponseHeadersInQueryPage()
{
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(10);

TryCatch<IQueryPipelineStage> monadicCreate = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: new SqlQuerySpec(@"
SELECT c._rid AS _rid, [{""item"": c._ts}] AS orderByItems, c AS payload
FROM c
WHERE {documentdb-formattableorderbyquery-filter}
ORDER BY c._ts"),
targetRanges: await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default),
partitionKey: null,
orderByColumns: new List<OrderByColumn>()
{
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
cancellationToken: default,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;

while (await queryPipelineStage.MoveNextAsync())
{
TryCatch<QueryPage> tryGetQueryPage = queryPipelineStage.Current;
if (tryGetQueryPage.Failed)
{
Assert.Fail(tryGetQueryPage.Exception.ToString());
}

QueryPage queryPage = tryGetQueryPage.Result;
Assert.IsTrue(queryPage.AdditionalHeaders.Count > 0);
}
}

[TestMethod]
[DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")]
Expand Down