Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query : Adds WithParameterStream to QueryDefinition to pass in serialized values #2222

Merged
merged 16 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosSqlQuerySpecJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,27 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s
serializer.Serialize(writer, sqlParameter.Name);
writer.WritePropertyName("value");

// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
// if the SqlParameter has stream value we dont pass it through the custom serializer.
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
if (sqlParameter.Value is SerializedParameterValue serializedEncryptedData)
{
using (StreamReader streamReader = new StreamReader(str))
using (StreamReader streamReader = new StreamReader(serializedEncryptedData.valueStream))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
}
}
else
{
// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
{
using (StreamReader streamReader = new StreamReader(str))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
}
}
}

writer.WriteEndObject();
}
Expand All @@ -62,15 +74,14 @@ internal static CosmosSerializer CreateSqlQuerySpecSerializer(
CosmosSerializer cosmosSerializer,
CosmosSerializer propertiesSerializer)
{
// If both serializers are the same no need for the custom converter
if (object.ReferenceEquals(cosmosSerializer, propertiesSerializer))
if (propertiesSerializer is CosmosJsonSerializerWrapper cosmosJsonSerializerWrapper)
{
return propertiesSerializer;
propertiesSerializer = cosmosJsonSerializerWrapper.InternalJsonSerializer;
}

JsonSerializerSettings settings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer) }
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer ?? propertiesSerializer) }
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
};

return new CosmosJsonSerializerWrapper(new CosmosJsonDotNetSerializer(settings));
Expand Down
37 changes: 37 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Cosmos.Query.Core;
using Newtonsoft.Json;

Expand Down Expand Up @@ -108,6 +109,37 @@ public QueryDefinition WithParameter(string name, object value)
return this;
}

/// <summary>
/// Add parameters with Stream Value to the SQL query.
/// </summary>
/// <param name="name">The name of the parameter.</param>
/// <param name="valueStream">The stream value for the parameter.</param>
/// <remarks>
/// UseCase : This is useful in cases like running a Query on Encrypted Values, where the value is generated post serialization and then encrypted
/// and we don't want to change the cipher value due to a call to serializer again.
/// If the same name is added again it will replace the original value.
/// </remarks>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// QueryDefinition query = new QueryDefinition(
/// "select * from t where t.Account = @account")
/// .WithParameterStream("@account", streamValue);
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="QueryDefinition"/>.</returns>
public QueryDefinition WithParameterStream(string name, Stream valueStream)
{
// pack it into an internal type for identification.
SerializedParameterValue serializedParameterValue = new SerializedParameterValue
{
valueStream = valueStream
};

return this.WithParameter(name, serializedParameterValue);
}

/// <summary>
/// Returns the names and values of parameters in this <see cref="QueryDefinition"/>.
/// </summary>
Expand Down Expand Up @@ -164,4 +196,9 @@ IEnumerator IEnumerable.GetEnumerator()
}
}
}

internal struct SerializedParameterValue
{
internal Stream valueStream;
}
}
10 changes: 6 additions & 4 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ internal CosmosSerializerCore(
if (customSerializer == null)
{
this.customSerializer = null;
this.sqlQuerySpecSerializer = null;
// this would allow us to set the JsonConverter and inturn handle Serialized/Stream Query Parameter Value.
this.sqlQuerySpecSerializer = CosmosSqlQuerySpecJsonConverter.CreateSqlQuerySpecSerializer(
cosmosSerializer: this.customSerializer,
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
propertiesSerializer: CosmosSerializerCore.propertiesSerializer);
this.patchOperationSerializer = null;
}
else
Expand Down Expand Up @@ -84,16 +87,15 @@ internal Stream ToStreamSqlQuerySpec(SqlQuerySpec input, ResourceType resourceTy

// All the public types that support query use the custom serializer
// Internal types like offers will use the default serializer.
if (this.customSerializer != null &&
(resourceType == ResourceType.Database ||
if (resourceType == ResourceType.Database ||
resourceType == ResourceType.Collection ||
resourceType == ResourceType.Document ||
resourceType == ResourceType.Trigger ||
resourceType == ResourceType.UserDefinedFunction ||
resourceType == ResourceType.StoredProcedure ||
resourceType == ResourceType.Permission ||
resourceType == ResourceType.User ||
resourceType == ResourceType.Conflict))
resourceType == ResourceType.Conflict)
{
serializer = this.sqlQuerySpecSerializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,188 @@ public async Task ItemCustomSerialzierTest()
}
}

[TestMethod]
public async Task QueryStreamValueTest()
{
DateTime createDateTime = DateTime.UtcNow;

dynamic testItem1 = new
{
id = "testItem1",
cost = (double?)null,
totalCost = 98.2789,
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
};

dynamic testItem2 = new
{
id = "testItem2",
cost = (double?)null,
totalCost = 98.2789,
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
};

//with Custom Serializer.
JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSerializerHelper.FormatNumbersAsTextConverter() }
};

int toStreamCount = 0;
int fromStreamCount = 0;
CosmosSerializerHelper cosmosSerializerHelper = new CosmosSerializerHelper(
jsonSerializerSettings,
toStreamCallBack: (itemValue) =>
{
Type itemType = itemValue?.GetType();
if (itemValue == null
|| itemType == typeof(int)
|| itemType == typeof(double)
|| itemType == typeof(string)
|| itemType == typeof(DateTime)
|| itemType == typeof(HttpStatusCode)
|| itemType == typeof(int[]))
{
toStreamCount++;
}
},
fromStreamCallback: (item) => fromStreamCount++);

CosmosClientOptions options = new CosmosClientOptions()
{
Serializer = cosmosSerializerHelper
};

CosmosClient clientSerializer = TestCommon.CreateCosmosClient(options);
Container containerSerializer = clientSerializer.GetContainer(this.database.Id, this.Container.Id);

List<QueryDefinition> queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithParameterStream("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithParameterStream("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithParameterStream("@taskNum", cosmosSerializerHelper.ToStream<dynamic>(testItem1.taskNum)),
new QueryDefinition("select * from t where t.totalCost = @totalCost" )
.WithParameterStream("@totalCost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.totalCost)),
new QueryDefinition("select * from t where t.createdDateTime = @createdDateTime" )
.WithParameterStream("@createdDateTime", cosmosSerializerHelper.ToStream<dynamic>(testItem1.createdDateTime)),
new QueryDefinition("select * from t where t.statusCode = @statusCode" )
.WithParameterStream("@statusCode", cosmosSerializerHelper.ToStream<dynamic>(testItem1.statusCode)),
new QueryDefinition("select * from t where t.itemIds = @itemIds" )
.WithParameterStream("@itemIds", cosmosSerializerHelper.ToStream<dynamic>(testItem1.itemIds)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithParameterStream("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk))
.WithParameterStream("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
};

try
{
await containerSerializer.CreateItemAsync<dynamic>(testItem1);
await containerSerializer.CreateItemAsync<dynamic>(testItem2);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
// Ignore conflicts since the object already exists
}

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
toStreamCount = 0;
fromStreamCount = 0;

List<dynamic> allItems = new List<dynamic>();
int pageCount = 0;
using (FeedIterator<dynamic> feedIterator = containerSerializer.GetItemQueryIterator<dynamic>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");

// There should be no call to custom serializer since the parameter values are already serialized.
Assert.AreEqual(0, toStreamCount, $"missing to stream call. Expected: 0 , Actual: {toStreamCount} for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, fromStreamCount);
}

// Standard Cosmos Serializer Used

CosmosClient clientStandardSerializer = TestCommon.CreateCosmosClient(useCustomSeralizer:false);
Container containerStandardSerializer = clientStandardSerializer.GetContainer(this.database.Id, this.Container.Id);

testItem1 = ToDoActivity.CreateRandomToDoActivity();
testItem1.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem1, new Cosmos.PartitionKey(testItem1.pk));

testItem2 = ToDoActivity.CreateRandomToDoActivity();
testItem2.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem2, new Cosmos.PartitionKey(testItem2.pk));
CosmosSerializer cosmosSerializer = containerStandardSerializer.Database.Client.ClientOptions.Serializer;

queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithParameterStream("@pk", cosmosSerializer.ToStream(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithParameterStream("@cost", cosmosSerializer.ToStream(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithParameterStream("@taskNum", cosmosSerializer.ToStream(testItem1.taskNum)),
new QueryDefinition("select * from t where t.CamelCase = @CamelCase" )
.WithParameterStream("@CamelCase", cosmosSerializer.ToStream(testItem1.CamelCase)),
new QueryDefinition("select * from t where t.valid = @valid" )
.WithParameterStream("@valid", cosmosSerializer.ToStream(testItem1.valid)),
new QueryDefinition("select * from t where t.description = @description" )
.WithParameterStream("@description", cosmosSerializer.ToStream(testItem1.description)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithParameterStream("@pk", cosmosSerializer.ToStream(testItem1.pk))
.WithParameterStream("@cost", cosmosSerializer.ToStream(testItem1.cost)),
};

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
List<ToDoActivity> allItems = new List<ToDoActivity>();
int pageCount = 0;
using (FeedIterator<ToDoActivity> feedIterator = containerStandardSerializer.GetItemQueryIterator<ToDoActivity>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<ToDoActivity> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, 1);


IReadOnlyList<(string Name, object Value)> parameters1 = queryDefinition.GetQueryParameters();
IReadOnlyList<(string Name, object Value)> parameters2 = queryDefinition.GetQueryParameters();

Assert.AreSame(parameters1, parameters2);
}
}

[TestMethod]
public async Task ItemIterator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4462,6 +4462,11 @@
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithParameter(System.String, System.Object);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"Microsoft.Azure.Cosmos.QueryDefinition WithParameterStream(System.String, System.IO.Stream)": {
"Type": "Method",
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithParameterStream(System.String, System.IO.Stream);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.Collections.Generic.IReadOnlyList`1[System.ValueTuple`2[System.String,System.Object]] GetQueryParameters()": {
"Type": "Method",
"Attributes": [],
Expand Down