Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadMany: Fixes ReadMany API for None Partition key values #2661

Merged
merged 10 commits into from
Aug 20, 2021
88 changes: 62 additions & 26 deletions Microsoft.Azure.Cosmos/src/ReadManyQueryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ public override async Task<ResponseMessage> ExecuteReadManyRequestAsync(IReadOnl
ITrace trace,
CancellationToken cancellationToken)
{
string resourceId = await this.container.GetCachedRIDAsync(cancellationToken);
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap =
await this.CreatePartitionKeyRangeItemListMapAsync(items, cancellationToken);
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap;
string resourceId;
try
{
resourceId = await this.container.GetCachedRIDAsync(cancellationToken);
partitionKeyRangeItemMap = await this.CreatePartitionKeyRangeItemListMapAsync(items, trace, cancellationToken);
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request: null);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

List<ResponseMessage>[] queryResponses = await this.ReadManyTaskHelperAsync(partitionKeyRangeItemMap,
readManyRequestOptions,
trace,
cancellationToken);
readManyRequestOptions,
trace,
cancellationToken);

return this.CombineStreamsFromQueryResponses(queryResponses, resourceId, trace); // also disposes the response messages
}
Expand All @@ -60,7 +68,7 @@ public override async Task<FeedResponse<T>> ExecuteReadManyRequestAsync<T>(IRead
CancellationToken cancellationToken)
{
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap =
await this.CreatePartitionKeyRangeItemListMapAsync(items, cancellationToken);
await this.CreatePartitionKeyRangeItemListMapAsync(items, trace, cancellationToken);

List<ResponseMessage>[] queryResponses = await this.ReadManyTaskHelperAsync(partitionKeyRangeItemMap,
readManyRequestOptions,
Expand Down Expand Up @@ -116,6 +124,7 @@ internal async Task<List<ResponseMessage>[]> ReadManyTaskHelperAsync(IDictionary

private async Task<IDictionary<PartitionKeyRange, List<(string, PartitionKey)>>> CreatePartitionKeyRangeItemListMapAsync(
IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default)
{
CollectionRoutingMap collectionRoutingMap = await this.container.GetRoutingMapAsync(cancellationToken);
Expand All @@ -125,7 +134,9 @@ internal async Task<List<ResponseMessage>[]> ReadManyTaskHelperAsync(IDictionary

foreach ((string id, PartitionKey pk) item in items)
{
string effectivePartitionKeyValue = item.pk.InternalKey.GetEffectivePartitionKeyString(this.partitionKeyDefinition);
Documents.Routing.PartitionKeyInternal partitionKeyInternal =
await this.GetPartitionKeyInternalAsync(item.pk, trace, cancellationToken);
string effectivePartitionKeyValue = partitionKeyInternal.GetEffectivePartitionKeyString(this.partitionKeyDefinition);
PartitionKeyRange partitionKeyRange = collectionRoutingMap.GetRangeByEffectivePartitionKey(effectivePartitionKeyValue);
if (partitionKeyRangeItemMap.TryGetValue(partitionKeyRange, out List<(string, PartitionKey)> itemList))
{
Expand Down Expand Up @@ -259,33 +270,45 @@ private QueryDefinition CreateReadManyQueryDefinitionForOther(List<(string, Part

queryStringBuilder.Append("SELECT * FROM c WHERE ( ");
for (int i = startIndex; i < totalItemCount; i++)
{
object[] pkValues = items[i].Item2.InternalKey.ToObjectArray();

if (pkValues.Length != this.partitionKeyDefinition.Paths.Count)
{
throw new ArgumentException("Number of components in the partition key value does not match the definition.");
}

{
string pkParamName = "@param_pk" + i;
string idParamName = "@param_id" + i;
sqlParameters.Add(new SqlParameter(idParamName, items[i].Item1));

queryStringBuilder.Append("( ");
queryStringBuilder.Append("c.id = ");
queryStringBuilder.Append(idParamName);
for (int j = 0; j < this.partitionKeySelectors.Count; j++)
if (items[i].Item2.IsNone)
{
foreach (string partitionKeySelector in this.partitionKeySelectors)
{
queryStringBuilder.Append(" AND ");
queryStringBuilder.Append("IS_DEFINED(c");
queryStringBuilder.Append(partitionKeySelector);
queryStringBuilder.Append(") = false");
}
}
else
{
queryStringBuilder.Append(" AND ");
queryStringBuilder.Append("c");
queryStringBuilder.Append(this.partitionKeySelectors[j]);
queryStringBuilder.Append(" = ");

string pkParamNameForSinglePath = pkParamName + j;
sqlParameters.Add(new SqlParameter(pkParamNameForSinglePath, pkValues[j]));
queryStringBuilder.Append(pkParamNameForSinglePath);
object[] pkValues = items[i].Item2.InternalKey.ToObjectArray();
if (pkValues.Length != this.partitionKeyDefinition.Paths.Count)
{
throw new ArgumentException("Number of components in the partition key " +
"value does not match the definition.");
}
for (int j = 0; j < this.partitionKeySelectors.Count; j++)
{
queryStringBuilder.Append(" AND ");
queryStringBuilder.Append("c");
queryStringBuilder.Append(this.partitionKeySelectors[j]);
queryStringBuilder.Append(" = ");

string pkParamNameForSinglePath = pkParamName + j;
sqlParameters.Add(new SqlParameter(pkParamNameForSinglePath, pkValues[j]));
queryStringBuilder.Append(pkParamNameForSinglePath);
}
}

queryStringBuilder.Append(" )");

if (i < totalItemCount - 1)
Expand Down Expand Up @@ -365,6 +388,19 @@ private void CancelCancellationToken(CancellationToken cancellationToken)
}
}

private ValueTask<Documents.Routing.PartitionKeyInternal> GetPartitionKeyInternalAsync(PartitionKey partitionKey,
ITrace trace,
CancellationToken cancellationToken)
{
if (partitionKey.IsNone)
{
return new ValueTask<Documents.Routing.PartitionKeyInternal>(
this.container.GetNonePartitionKeyValueAsync(trace, cancellationToken));
}
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved

return new ValueTask<Documents.Routing.PartitionKeyInternal>(partitionKey.InternalKey);
}

private class ReadManyFeedResponseEnumerable<T> : IEnumerable<T>
{
private readonly List<FeedResponse<T>> typedResponses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,17 @@ public async Task<ResponseMessage> ReadManyItemsStreamAsync(
throw new ArgumentNullException(nameof(trace));
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(await this.GetPartitionKeyDefinitionAsync(),
PartitionKeyDefinition partitionKeyDefinition;
try
{
partitionKeyDefinition = await this.GetPartitionKeyDefinitionAsync();
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request: null);
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(partitionKeyDefinition,
this);

return await readManyHelper.ExecuteReadManyRequestAsync(items,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,89 @@ await container.CreateItemAsync(
await database.DeleteAsync();
}

[TestMethod]
public async Task ReadManyWithNonePkValues()
{
for (int i = 0; i < 5; i++)
{
await this.Container.CreateItemAsync(new ActivityWithNoPk("id" + i.ToString()),
PartitionKey.None);
}

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 5; i++)
{
itemList.Add(("id" + i.ToString(), PartitionKey.None));
}

FeedResponse<ActivityWithNoPk> feedResponse = await this.Container.ReadManyItemsAsync<ActivityWithNoPk>(itemList);
Assert.AreEqual(feedResponse.Count, 5);
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
int j = 0;
foreach (ActivityWithNoPk item in feedResponse.Resource)
{
Assert.AreEqual(item.id, "id" + j);
j++;
}
}

[TestMethod]
public async Task ReadManyItemsFromNonPartitionedContainers()
{
ContainerInternal container = await NonPartitionedContainerHelper.CreateNonPartitionedContainer(this.database,
Guid.NewGuid().ToString());
for (int i = 0; i < 5; i++)
{
await NonPartitionedContainerHelper.CreateItemInNonPartitionedContainer(container, "id" + i.ToString());
}

// read using PartitionKey.None pk value
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 5; i++)
{
itemList.Add(("id" + i.ToString(), PartitionKey.None));
}

FeedResponse<ActivityWithNoPk> feedResponse = await container.ReadManyItemsAsync<ActivityWithNoPk>(itemList);
Assert.AreEqual(feedResponse.Count, 5);

// Start inserting documents with same id but new pk values
for (int i = 0; i < 5; i++)
{
await container.CreateItemAsync(new ActivityWithSystemPk("id" + i.ToString(), "newPK"),
new PartitionKey("newPK"));
}

feedResponse = await container.ReadManyItemsAsync<ActivityWithNoPk>(itemList);
Assert.AreEqual(feedResponse.Count, 5);
int j = 0;
foreach (ActivityWithNoPk item in feedResponse.Resource)
{
Assert.AreEqual(item.id, "id" + j);
j++;
}

for (int i = 0; i < 5; i++)
{
itemList.Add(("id" + i.ToString(), new PartitionKey("newPK")));
}
FeedResponse<ActivityWithSystemPk> feedResponseWithPK = await container.ReadManyItemsAsync<ActivityWithSystemPk>(itemList);
Assert.AreEqual(feedResponseWithPK.Count, 10);
j = 0;
foreach (ActivityWithSystemPk item in feedResponseWithPK.Resource)
{
Assert.AreEqual(item.id, "id" + (j % 5));
if (j > 4)
{
Assert.AreEqual(item._partitionKey, "newPK");
}
else
{
Assert.IsNull(item._partitionKey);
}
j++;
}
}

[TestMethod]
[DataRow(HttpStatusCode.NotFound)]
public async Task ReadManyExceptionsTest(HttpStatusCode statusCode)
Expand Down Expand Up @@ -508,5 +591,34 @@ public override async Task<ResponseMessage> SendAsync(RequestMessage requestMess
return await base.SendAsync(requestMessage, cancellationToken);
}
}

private class ActivityWithNoPk
{
public ActivityWithNoPk(string id)
{
this.id = id;
}

#pragma warning disable IDE1006 // Naming Styles
public string id { get; set; }
#pragma warning restore IDE1006 // Naming Styles
}

private class ActivityWithSystemPk
{
public ActivityWithSystemPk(string id, string _partitionKey)
{
this.id = id;
this._partitionKey = _partitionKey;
}

#pragma warning disable IDE1006 // Naming Styles
public string id { get; set; }
#pragma warning restore IDE1006 // Naming Styles

#pragma warning disable IDE1006 // Naming Styles
public string _partitionKey { get; set; }
#pragma warning restore IDE1006 // Naming Styles
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public async Task GatewayRequestStatsTest()
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity();
ItemResponse<ToDoActivity> response = await this.Container.CreateItemAsync(item);

ClientSideRequestStatisticsTraceDatum datum = this.GetClientSideRequestStatsFromTrace(((CosmosTraceDiagnostics)response.Diagnostics).Value, "Transport");
Assert.IsNotNull(datum.HttpResponseStatisticsList);
Assert.AreEqual(datum.HttpResponseStatisticsList.Count, 1);
Expand Down Expand Up @@ -69,6 +70,7 @@ public async Task GatewayRetryRequestStatsTest(string uriToThrow, string traceTo
using (CosmosClient cosmosClient = TestCommon.CreateCosmosClient(options))
{
Container container = cosmosClient.GetContainer(this.Database.Id, this.Container.Id);

ItemResponse<ToDoActivity> response = await container.ReadItemAsync<ToDoActivity>(item.id, new PartitionKey(item.pk));
ClientSideRequestStatisticsTraceDatum datum = this.GetClientSideRequestStatsFromTrace(((CosmosTraceDiagnostics)response.Diagnostics).Value, traceToFind);
Assert.IsNotNull(datum.HttpResponseStatisticsList);
Expand Down