Skip to content

Commit

Permalink
Query: Fixes query plan via service interop to use custom serializer (#…
Browse files Browse the repository at this point in the history
…3154)

Customer can pass a customer serializer in which can apply and data transformation necessary. For example it could convert a int or double to a string. 

Query has 3 different ways to get the query plan.
1. ServiceInterop.dll which requires Windows x64
2. Antlr parser
3. Gateway 

### The custom serializer is used in query for the following scenarios.
1. ServiceInterop.dll-> Using a random [JsonConvert ](https://github.com/Azure/azure-cosmos-dotnet-v3/blob/c8935ac2f864fb829f5d941dda07c74aec86a677/Microsoft.Azure.Cosmos/src/Query/Core/QueryPlan/QueryPartitionProvider.cs#L169)instead of the [standard serialization contract](https://github.com/Azure/azure-cosmos-dotnet-v3/blob/c8935ac2f864fb829f5d941dda07c74aec86a677/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs#L123)
2. Antlr parser -> Does not take in parameters so serialization is not necessary. It just looks at query text.
3. Gateway -> Applies the[ correct serialization ](https://github.com/Azure/azure-cosmos-dotnet-v3/blob/c8935ac2f864fb829f5d941dda07c74aec86a677/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs#L163)
4. Executing the query(sending it to gateway or a partition):  Applies the[ correct serialization ](https://github.com/Azure/azure-cosmos-dotnet-v3/blob/c8935ac2f864fb829f5d941dda07c74aec86a677/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs#L123)

### Why didn't testing catch this?
There are existing[ tests which validate this scenario](https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1d1d4c753cae896e6d96a98ef07a276cf1e4f130/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemTests.cs#L839), but they are not calculating the correct expected count. The test assumed the query pipeline would only serialize the SqlQuerySpec once. The implementation serializes it for every page request to the backend and to generate the query plan.

### Who is impacted?
Only customers that are running on Windows x64 and have custom serialization that filters on the partition key value only in the query. If the partition key is provided in the request options then it is handled correctly because request option overrides the query text.

### Impact:
This could result in the query being sent to the wrong partitions' which would cause it query returning possibly less results  than expected because it would be routed to the wrong partition.

### Solution:
Based on the current models and contract the serialization logic should not be in the query code. The query pipeline should not know or care about how the parameters are serialized. 

To keep this abstraction in place the serialization logic will be moved the CosmosQueryClientCore.cs like all of the other places that currently handle the custom serialization. This keeps all the serialization logic in the same file, and keeps it the same for all the different methods to get the query plan. 

This will cause the serialized string to be passed down instead of the SqlQuerySpec. This is a better contract because the service interop only requires the serialized string. All the contracts now match what is actually needed for it to execute getting the query plan. This follows the same model as getting it from gateway or sending it to be executed.
  • Loading branch information
j82w authored May 3, 2022
1 parent 3632f3a commit 8a0d18a
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync(
cosmosQueryContext.QueryClient,
inputParameters.SqlQuerySpec,
cosmosQueryContext.ResourceTypeEnum,
partitionKeyDefinition,
inputParameters.PartitionKey != null,
createQueryPipelineTrace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract Task<ContainerQueryProperties> GetCachedContainerQueryProperties

public abstract Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartitionedQueryExecutionInfoAsync(
SqlQuerySpec sqlQuerySpec,
Documents.ResourceType resourceType,
Documents.PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
bool isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void Update(IDictionary<string, object> queryengineConfiguration)
}

public TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionInfo(
SqlQuerySpec querySpec,
string querySpecJsonString,
PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
bool isContinuationExpected,
Expand All @@ -113,7 +113,7 @@ public TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionIn
bool allowDCount)
{
TryCatch<PartitionedQueryExecutionInfoInternal> tryGetInternalQueryInfo = this.TryGetPartitionedQueryExecutionInfoInternal(
querySpec: querySpec,
querySpecJsonString: querySpecJsonString,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: requireFormattableOrderByQuery,
isContinuationExpected: isContinuationExpected,
Expand Down Expand Up @@ -153,21 +153,19 @@ internal PartitionedQueryExecutionInfo ConvertPartitionedQueryExecutionInfo(
}

internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryExecutionInfoInternal(
SqlQuerySpec querySpec,
string querySpecJsonString,
PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
bool isContinuationExpected,
bool allowNonValueAggregateQuery,
bool hasLogicalPartitionKey,
bool allowDCount)
{
if (querySpec == null || partitionKeyDefinition == null)
if (querySpecJsonString == null || partitionKeyDefinition == null)
{
return TryCatch<PartitionedQueryExecutionInfoInternal>.FromResult(DefaultInfoInternal);
}

string queryText = JsonConvert.SerializeObject(querySpec);

List<string> paths = new List<string>(partitionKeyDefinition.Paths);
List<IReadOnlyList<string>> pathPartsList = new List<IReadOnlyList<string>>(paths.Count);
uint[] partsLengths = new uint[paths.Count];
Expand Down Expand Up @@ -205,7 +203,7 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
{
errorCode = ServiceInteropWrapper.GetPartitionKeyRangesFromQuery2(
this.serviceProvider,
queryText,
querySpecJsonString,
requireFormattableOrderByQuery,
isContinuationExpected,
allowNonValueAggregateQuery,
Expand All @@ -230,7 +228,7 @@ internal TryCatch<PartitionedQueryExecutionInfoInternal> TryGetPartitionedQueryE
{
errorCode = ServiceInteropWrapper.GetPartitionKeyRangesFromQuery2(
this.serviceProvider,
queryText,
querySpecJsonString,
requireFormattableOrderByQuery,
isContinuationExpected,
allowNonValueAggregateQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public QueryPlanHandler(CosmosQueryClient queryClient)

public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
SqlQuerySpec sqlQuerySpec,
Documents.ResourceType resourceType,
PartitionKeyDefinition partitionKeyDefinition,
QueryFeatures supportedQueryFeatures,
bool hasLogicalPartitionKey,
Expand All @@ -43,6 +44,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = await this.TryGetQueryInfoAsync(
sqlQuerySpec,
resourceType,
partitionKeyDefinition,
hasLogicalPartitionKey,
cancellationToken);
Expand All @@ -68,6 +70,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(
public async Task<TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)>> TryGetQueryInfoAndIfSupportedAsync(
QueryFeatures supportedQueryFeatures,
SqlQuerySpec sqlQuerySpec,
Documents.ResourceType resourceType,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
CancellationToken cancellationToken = default)
Expand All @@ -86,6 +89,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryInfo = await this.TryGetQueryInfoAsync(
sqlQuerySpec,
resourceType,
partitionKeyDefinition,
hasLogicalPartitionKey,
cancellationToken);
Expand All @@ -102,6 +106,7 @@ public async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryPlanAsync(

private Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(
SqlQuerySpec sqlQuerySpec,
Documents.ResourceType resourceType,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
CancellationToken cancellationToken = default)
Expand All @@ -110,6 +115,7 @@ private Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetQueryInfoAsync(

return this.queryClient.TryGetPartitionedQueryExecutionInfoAsync(
sqlQuerySpec: sqlQuerySpec,
resourceType: resourceType,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: true,
isContinuationExpected: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ internal static class QueryPlanRetriever
public static async Task<PartitionedQueryExecutionInfo> GetQueryPlanWithServiceInteropAsync(
CosmosQueryClient queryClient,
SqlQuerySpec sqlQuerySpec,
Documents.ResourceType resourceType,
PartitionKeyDefinition partitionKeyDefinition,
bool hasLogicalPartitionKey,
ITrace trace,
Expand Down Expand Up @@ -62,6 +63,7 @@ public static async Task<PartitionedQueryExecutionInfo> GetQueryPlanWithServiceI

TryCatch<PartitionedQueryExecutionInfo> tryGetQueryPlan = await queryPlanHandler.TryGetQueryPlanAsync(
sqlQuerySpec,
resourceType,
partitionKeyDefinition,
QueryPlanRetriever.SupportedQueryFeatures,
hasLogicalPartitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private static bool ServiceInteropAvailable()
}

providedRanges = PartitionRoutingHelper.GetProvidedPartitionKeyRanges(
querySpec: this.QuerySpec,
querySpecJsonString: JsonConvert.SerializeObject(this.QuerySpec),
enableCrossPartitionQuery: enableCrossPartitionQuery,
parallelizeCrossPartitionQuery: false,
isContinuationExpected: this.isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExecutionInf

QueryPartitionProvider queryPartitionProvider = await this.Client.GetQueryPartitionProviderAsync();
TryCatch<PartitionedQueryExecutionInfo> tryGetPartitionedQueryExecutionInfo = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
querySpec: this.QuerySpec,
querySpecJsonString: JsonConvert.SerializeObject(this.QuerySpec),
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: requireFormattableOrderByQuery,
isContinuationExpected: isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public override async Task<ContainerQueryProperties> GetCachedContainerQueryProp

public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartitionedQueryExecutionInfoAsync(
SqlQuerySpec sqlQuerySpec,
ResourceType resourceType,
PartitionKeyDefinition partitionKeyDefinition,
bool requireFormattableOrderByQuery,
bool isContinuationExpected,
Expand All @@ -87,8 +88,20 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit
bool allowDCount,
CancellationToken cancellationToken)
{
string queryString = null;
if (sqlQuerySpec != null)
{
using (Stream stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, resourceType))
{
using (StreamReader reader = new StreamReader(stream))
{
queryString = reader.ReadToEnd();
}
}
}

return (await this.documentClient.QueryPartitionProvider).TryGetPartitionedQueryExecutionInfo(
querySpec: sqlQuerySpec,
querySpecJsonString: queryString,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: requireFormattableOrderByQuery,
isContinuationExpected: isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
TryCatch<(PartitionedQueryExecutionInfo queryPlan, bool supported)> tryGetQueryInfoAndIfSupported = await queryPlanHandler.TryGetQueryInfoAndIfSupportedAsync(
supportedQueryFeatures,
queryDefinition.ToSqlQuerySpec(),
ResourceType.Document,
partitionKeyDefinition,
requestOptions.PartitionKey.HasValue,
cancellationToken);
Expand Down
8 changes: 4 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionRoutingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace Microsoft.Azure.Cosmos.Routing
internal class PartitionRoutingHelper
{
public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
SqlQuerySpec querySpec,
string querySpecJsonString,
bool enableCrossPartitionQuery,
bool parallelizeCrossPartitionQuery,
bool isContinuationExpected,
Expand All @@ -38,9 +38,9 @@ public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
string clientApiVersion,
out QueryInfo queryInfo)
{
if (querySpec == null)
if (querySpecJsonString == null)
{
throw new ArgumentNullException(nameof(querySpec));
throw new ArgumentNullException(nameof(querySpecJsonString));
}

if (partitionKeyDefinition == null)
Expand All @@ -54,7 +54,7 @@ public static IReadOnlyList<Range<string>> GetProvidedPartitionKeyRanges(
}

TryCatch<PartitionedQueryExecutionInfo> tryGetPartitionQueryExecutionInfo = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo(
querySpec: querySpec,
querySpecJsonString: querySpecJsonString,
partitionKeyDefinition: partitionKeyDefinition,
requireFormattableOrderByQuery: VersionUtility.IsLaterThan(clientApiVersion, HttpConstants.VersionDates.v2016_11_14),
isContinuationExpected: isContinuationExpected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public async Task TestInitialize()
this.containerSettings = new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey);
ContainerResponse response = await this.database.CreateContainerAsync(
this.containerSettings,
throughput: 15000,
cancellationToken: this.cancellationToken);
Assert.IsNotNull(response);
Assert.IsNotNull(response.Container);
Expand Down Expand Up @@ -960,7 +961,7 @@ public async Task ItemCustomSerialzierTest()

// Each parameter in query spec should be a call to the custom serializer
int parameterCount = queryDefinition.ToSqlQuerySpec().Parameters.Count;
Assert.AreEqual(parameterCount, toStreamCount, $"missing to stream call. Expected: {parameterCount}, Actual: {toStreamCount} for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual((parameterCount*pageCount)+parameterCount, toStreamCount, $"missing to stream call. Expected: {(parameterCount * pageCount) + parameterCount}, Actual: {toStreamCount} for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, fromStreamCount);
}
}
Expand Down Expand Up @@ -1172,7 +1173,15 @@ public async Task QueryStreamValueTest()
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, 1);
if (queryDefinition.QueryText.Contains("pk"))
{
Assert.AreEqual(1, pageCount);
}
else
{
Assert.AreEqual(3, pageCount);
}



IReadOnlyList<(string Name, object Value)> parameters1 = queryDefinition.GetQueryParameters();
Expand Down Expand Up @@ -1318,13 +1327,20 @@ public async Task ItemMultiplePartitionQuery()
sql,
requestOptions: requestOptions);

bool found = false;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> iter = await feedIterator.ReadNextAsync();
Assert.AreEqual(1, iter.Count());
ToDoActivity response = iter.First();
Assert.AreEqual(find.id, response.id);
Assert.IsTrue(iter.Count() <= 1);
if(iter.Count() == 1)
{
found = true;
ToDoActivity response = iter.First();
Assert.AreEqual(find.id, response.id);
}
}

Assert.IsTrue(found);
}

/// <summary>
Expand All @@ -1341,7 +1357,11 @@ public async Task ItemMultiplePartitionOrderByQueryStream()
System.Globalization.CultureInfo.GetCultureInfo("fr-FR")
};

IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(this.Container, 300, randomPartitionKey: true);
IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(
this.Container,
300,
randomPartitionKey: true,
randomTaskNumber: true);

try
{
Expand Down Expand Up @@ -1427,8 +1447,8 @@ public async Task ItemMultiplePartitionQueryStream()
Assert.AreEqual(deleteList.Count, resultList.Count);
Assert.IsTrue(totalRequstCharge > 0);

List<ToDoActivity> verifiedOrderBy = deleteList.OrderBy(x => x.taskNum).ToList();
resultList = resultList.OrderBy(x => x.taskNum).ToList();
List<ToDoActivity> verifiedOrderBy = deleteList.OrderBy(x => x.id).ToList();
resultList = resultList.OrderBy(x => x.id).ToList();
for (int i = 0; i < verifiedOrderBy.Count(); i++)
{
Assert.AreEqual(verifiedOrderBy[i].taskNum, resultList[i].taskNum);
Expand Down Expand Up @@ -1510,6 +1530,11 @@ public async Task EpkPointReadTest()
.InternalKey
.GetEffectivePartitionKeyString(this.containerSettings.PartitionKey);

properties = new Dictionary<string, object>()
{
{ WFConstants.BackendHeaders.EffectivePartitionKeyString, epk },
};

QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
IsEffectivePartitionKeyRouting = true,
Expand Down Expand Up @@ -1589,7 +1614,10 @@ public async Task ItemEpkQuerySingleKeyRangeValidation()
[TestMethod]
public async Task ItemQueryStreamSerializationSetting()
{
IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(this.Container, 101, randomPartitionKey: true);
IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(
container: this.Container,
pkCount: 101,
randomTaskNumber: true);

QueryDefinition sql = new QueryDefinition("SELECT * FROM toDoActivity t ORDER BY t.taskNum");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public async Task TestInitialize()
await base.TestInit();
string PartitionKey = "/pk";
ContainerResponse response = await this.database.CreateContainerAsync(
new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey),
new ContainerProperties(
id: Guid.NewGuid().ToString(),
partitionKeyPath: PartitionKey),
throughput: 15000,
cancellationToken: this.cancellationToken);
Assert.IsNotNull(response);
this.container = response;
Expand Down Expand Up @@ -102,7 +105,7 @@ public async Task TestQueryWithCustomJsonSerializer()
items.AddRange(await itemIterator.ReadNextAsync());
}

Assert.AreEqual(1, toStreamCount);
Assert.AreEqual(2, toStreamCount);
Assert.AreEqual(1, fromStreamCount);

toStreamCount = 0;
Expand All @@ -122,7 +125,7 @@ public async Task TestQueryWithCustomJsonSerializer()
}
}

Assert.AreEqual(1, toStreamCount);
Assert.AreEqual(2, toStreamCount);
Assert.AreEqual(0, fromStreamCount);

}
Expand Down
Loading

0 comments on commit 8a0d18a

Please sign in to comment.