Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query : Adds WithParameterStream to QueryDefinition to pass in serialized values #2222

Merged
merged 16 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions Microsoft.Azure.Cosmos/src/CosmosSqlQuerySpecJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,27 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s
serializer.Serialize(writer, sqlParameter.Name);
writer.WritePropertyName("value");

// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
// if the SqlParameter has stream value we dont pass it through the custom serializer.
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
if (sqlParameter.Value is SerializedParameterValue serializedEncryptedData)
{
using (StreamReader streamReader = new StreamReader(str))
using (StreamReader streamReader = new StreamReader(serializedEncryptedData.valueStream))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
}
}
else
{
// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
{
using (StreamReader streamReader = new StreamReader(str))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
}
}
}

writer.WriteEndObject();
}
Expand All @@ -62,15 +74,9 @@ internal static CosmosSerializer CreateSqlQuerySpecSerializer(
CosmosSerializer cosmosSerializer,
CosmosSerializer propertiesSerializer)
{
// If both serializers are the same no need for the custom converter
if (object.ReferenceEquals(cosmosSerializer, propertiesSerializer))
{
return propertiesSerializer;
}

JsonSerializerSettings settings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer) }
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer ?? propertiesSerializer) }
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
};

return new CosmosJsonSerializerWrapper(new CosmosJsonDotNetSerializer(settings));
Expand Down
54 changes: 54 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Cosmos.Query.Core;
using Newtonsoft.Json;

Expand Down Expand Up @@ -108,6 +109,54 @@ public QueryDefinition WithParameter(string name, object value)
return this;
}

/// <summary>
/// Add parameters to the SQL query
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <param name="name">The name of the parameter.</param>
/// <param name="valueStream">The stream value for the parameter.</param>
/// <remarks>
/// If the same name is added again it will replace the original value
/// </remarks>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// QueryDefinition query = new QueryDefinition(
/// "select * from t where t.Account = @account")
/// .WithParameter("@account", streamValue);
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="QueryDefinition"/>.</returns>
public QueryDefinition WithStreamParameter(string name, Stream valueStream)
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
{
// pack it into an internal type for identification.
SerializedParameterValue serializedParameterValue = new SerializedParameterValue
{
valueStream = valueStream
};

if (string.IsNullOrEmpty(name))
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ArgumentNullException(nameof(name));
}

if (this.parameters == null)
{
this.parameters = new List<SqlParameter>();
}

// Required to maintain previous contract when backed by a dictionary.
int index = this.parameters.FindIndex(param => param.Name == name);
if (index != -1)
{
this.parameters.RemoveAt(index);
}

this.parameters.Add(new SqlParameter(name, serializedParameterValue));

return this;
}

/// <summary>
/// Returns the names and values of parameters in this <see cref="QueryDefinition"/>.
/// </summary>
Expand Down Expand Up @@ -164,4 +213,9 @@ IEnumerator IEnumerable.GetEnumerator()
}
}
}

internal struct SerializedParameterValue
{
internal Stream valueStream;
}
}
10 changes: 6 additions & 4 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ internal CosmosSerializerCore(
if (customSerializer == null)
{
this.customSerializer = null;
this.sqlQuerySpecSerializer = null;
// this would allow us to set the JsonConverter and inturn handle Serialized/Stream Query Parameter Value.
this.sqlQuerySpecSerializer = this.sqlQuerySpecSerializer = CosmosSqlQuerySpecJsonConverter.CreateSqlQuerySpecSerializer(
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
cosmosSerializer: this.customSerializer,
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
propertiesSerializer: CosmosSerializerCore.propertiesSerializer);
this.patchOperationSerializer = null;
}
else
Expand Down Expand Up @@ -84,16 +87,15 @@ internal Stream ToStreamSqlQuerySpec(SqlQuerySpec input, ResourceType resourceTy

// All the public types that support query use the custom serializer
// Internal types like offers will use the default serializer.
if (this.customSerializer != null &&
(resourceType == ResourceType.Database ||
if (resourceType == ResourceType.Database ||
resourceType == ResourceType.Collection ||
resourceType == ResourceType.Document ||
resourceType == ResourceType.Trigger ||
resourceType == ResourceType.UserDefinedFunction ||
resourceType == ResourceType.StoredProcedure ||
resourceType == ResourceType.Permission ||
resourceType == ResourceType.User ||
resourceType == ResourceType.Conflict))
resourceType == ResourceType.Conflict)
{
serializer = this.sqlQuerySpecSerializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,188 @@ public async Task ItemCustomSerialzierTest()
}
}

[TestMethod]
public async Task QueryStreamValueTest()
{
DateTime createDateTime = DateTime.UtcNow;

dynamic testItem1 = new
{
id = "testItem1",
cost = (double?)null,
totalCost = 98.2789,
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
};

dynamic testItem2 = new
{
id = "testItem2",
cost = (double?)null,
totalCost = 98.2789,
kr-santosh marked this conversation as resolved.
Show resolved Hide resolved
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
};

//with Custom Serializer.
JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSerializerHelper.FormatNumbersAsTextConverter() }
};

int toStreamCount = 0;
int fromStreamCount = 0;
CosmosSerializerHelper cosmosSerializerHelper = new CosmosSerializerHelper(
jsonSerializerSettings,
toStreamCallBack: (itemValue) =>
{
Type itemType = itemValue?.GetType();
if (itemValue == null
|| itemType == typeof(int)
|| itemType == typeof(double)
|| itemType == typeof(string)
|| itemType == typeof(DateTime)
|| itemType == typeof(HttpStatusCode)
|| itemType == typeof(int[]))
{
toStreamCount++;
}
},
fromStreamCallback: (item) => fromStreamCount++);

CosmosClientOptions options = new CosmosClientOptions()
{
Serializer = cosmosSerializerHelper
};

CosmosClient clientSerializer = TestCommon.CreateCosmosClient(options);
Container containerSerializer = clientSerializer.GetContainer(this.database.Id, this.Container.Id);

List<QueryDefinition> queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithStreamParameter("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithStreamParameter("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithStreamParameter("@taskNum", cosmosSerializerHelper.ToStream<dynamic>(testItem1.taskNum)),
new QueryDefinition("select * from t where t.totalCost = @totalCost" )
.WithStreamParameter("@totalCost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.totalCost)),
new QueryDefinition("select * from t where t.createdDateTime = @createdDateTime" )
.WithStreamParameter("@createdDateTime", cosmosSerializerHelper.ToStream<dynamic>(testItem1.createdDateTime)),
new QueryDefinition("select * from t where t.statusCode = @statusCode" )
.WithStreamParameter("@statusCode", cosmosSerializerHelper.ToStream<dynamic>(testItem1.statusCode)),
new QueryDefinition("select * from t where t.itemIds = @itemIds" )
.WithStreamParameter("@itemIds", cosmosSerializerHelper.ToStream<dynamic>(testItem1.itemIds)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithStreamParameter("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk))
.WithStreamParameter("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
};

try
{
await containerSerializer.CreateItemAsync<dynamic>(testItem1);
await containerSerializer.CreateItemAsync<dynamic>(testItem2);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
// Ignore conflicts since the object already exists
}

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
toStreamCount = 0;
fromStreamCount = 0;

List<dynamic> allItems = new List<dynamic>();
int pageCount = 0;
using (FeedIterator<dynamic> feedIterator = containerSerializer.GetItemQueryIterator<dynamic>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");

// There should be no call to custom serializer since the parameter values are already serialized.
Assert.AreEqual(0, toStreamCount, $"missing to stream call. Expected: 0 , Actual: {toStreamCount} for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, fromStreamCount);
}

// Standard Cosmos Serializer Used

CosmosClient clientStandardSerializer = TestCommon.CreateCosmosClient(useCustomSeralizer:false);
Container containerStandardSerializer = clientStandardSerializer.GetContainer(this.database.Id, this.Container.Id);

testItem1 = ToDoActivity.CreateRandomToDoActivity();
testItem1.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem1, new Cosmos.PartitionKey(testItem1.pk));

testItem2 = ToDoActivity.CreateRandomToDoActivity();
testItem2.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem2, new Cosmos.PartitionKey(testItem2.pk));
CosmosSerializer cosmosSerializer = containerStandardSerializer.Database.Client.ClientOptions.Serializer;

queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithStreamParameter("@pk", cosmosSerializer.ToStream(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithStreamParameter("@cost", cosmosSerializer.ToStream(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithStreamParameter("@taskNum", cosmosSerializer.ToStream(testItem1.taskNum)),
new QueryDefinition("select * from t where t.CamelCase = @CamelCase" )
.WithStreamParameter("@CamelCase", cosmosSerializer.ToStream(testItem1.CamelCase)),
new QueryDefinition("select * from t where t.valid = @valid" )
.WithStreamParameter("@valid", cosmosSerializer.ToStream(testItem1.valid)),
new QueryDefinition("select * from t where t.description = @description" )
.WithStreamParameter("@description", cosmosSerializer.ToStream(testItem1.description)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithStreamParameter("@pk", cosmosSerializer.ToStream(testItem1.pk))
.WithStreamParameter("@cost", cosmosSerializer.ToStream(testItem1.cost)),
};

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
List<ToDoActivity> allItems = new List<ToDoActivity>();
int pageCount = 0;
using (FeedIterator<ToDoActivity> feedIterator = containerStandardSerializer.GetItemQueryIterator<ToDoActivity>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<ToDoActivity> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, 1);


IReadOnlyList<(string Name, object Value)> parameters1 = queryDefinition.GetQueryParameters();
IReadOnlyList<(string Name, object Value)> parameters2 = queryDefinition.GetQueryParameters();

Assert.AreSame(parameters1, parameters2);
}
}

[TestMethod]
public async Task ItemIterator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4426,6 +4426,11 @@
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithParameter(System.String, System.Object);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"Microsoft.Azure.Cosmos.QueryDefinition WithStreamParameter(System.String, System.IO.Stream)": {
"Type": "Method",
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithStreamParameter(System.String, System.IO.Stream);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.Collections.Generic.IReadOnlyList`1[System.ValueTuple`2[System.String,System.Object]] GetQueryParameters()": {
"Type": "Method",
"Attributes": [],
Expand Down