Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadMany: Fixes ReadMany API for None Partition key values #2661

Merged
merged 10 commits into from
Aug 20, 2021
59 changes: 45 additions & 14 deletions Microsoft.Azure.Cosmos/src/ReadManyQueryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ public override async Task<ResponseMessage> ExecuteReadManyRequestAsync(IReadOnl
ITrace trace,
CancellationToken cancellationToken)
{
string resourceId = await this.container.GetCachedRIDAsync(cancellationToken);
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap =
await this.CreatePartitionKeyRangeItemListMapAsync(items, cancellationToken);
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap;
string resourceId;
try
{
resourceId = await this.container.GetCachedRIDAsync(cancellationToken);
partitionKeyRangeItemMap = await this.CreatePartitionKeyRangeItemListMapAsync(items, trace, cancellationToken);
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request: null);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

List<ResponseMessage>[] queryResponses = await this.ReadManyTaskHelperAsync(partitionKeyRangeItemMap,
readManyRequestOptions,
trace,
cancellationToken);
readManyRequestOptions,
trace,
cancellationToken);

return this.CombineStreamsFromQueryResponses(queryResponses, resourceId, trace); // also disposes the response messages
}
Expand All @@ -60,7 +68,7 @@ public override async Task<FeedResponse<T>> ExecuteReadManyRequestAsync<T>(IRead
CancellationToken cancellationToken)
{
IDictionary<PartitionKeyRange, List<(string, PartitionKey)>> partitionKeyRangeItemMap =
await this.CreatePartitionKeyRangeItemListMapAsync(items, cancellationToken);
await this.CreatePartitionKeyRangeItemListMapAsync(items, trace, cancellationToken);

List<ResponseMessage>[] queryResponses = await this.ReadManyTaskHelperAsync(partitionKeyRangeItemMap,
readManyRequestOptions,
Expand Down Expand Up @@ -116,6 +124,7 @@ internal async Task<List<ResponseMessage>[]> ReadManyTaskHelperAsync(IDictionary

private async Task<IDictionary<PartitionKeyRange, List<(string, PartitionKey)>>> CreatePartitionKeyRangeItemListMapAsync(
IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
CancellationToken cancellationToken = default)
{
CollectionRoutingMap collectionRoutingMap = await this.container.GetRoutingMapAsync(cancellationToken);
Expand All @@ -125,7 +134,9 @@ internal async Task<List<ResponseMessage>[]> ReadManyTaskHelperAsync(IDictionary

foreach ((string id, PartitionKey pk) item in items)
{
string effectivePartitionKeyValue = item.pk.InternalKey.GetEffectivePartitionKeyString(this.partitionKeyDefinition);
Documents.Routing.PartitionKeyInternal partitionKeyInternal =
await this.GetPartitionKeyInternalAsync(item.pk, trace, cancellationToken);
string effectivePartitionKeyValue = partitionKeyInternal.GetEffectivePartitionKeyString(this.partitionKeyDefinition);
PartitionKeyRange partitionKeyRange = collectionRoutingMap.GetRangeByEffectivePartitionKey(effectivePartitionKeyValue);
if (partitionKeyRangeItemMap.TryGetValue(partitionKeyRange, out List<(string, PartitionKey)> itemList))
{
Expand Down Expand Up @@ -260,21 +271,28 @@ private QueryDefinition CreateReadManyQueryDefinitionForOther(List<(string, Part
queryStringBuilder.Append("SELECT * FROM c WHERE ( ");
for (int i = startIndex; i < totalItemCount; i++)
{
object[] pkValues = items[i].Item2.InternalKey.ToObjectArray();

if (pkValues.Length != this.partitionKeyDefinition.Paths.Count)
object[] pkValues;
if (items[i].Item2.IsNone)
{
throw new ArgumentException("Number of components in the partition key value does not match the definition.");
pkValues = new object[0];
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
}

else
{
pkValues = items[i].Item2.InternalKey.ToObjectArray();
if (pkValues.Length != this.partitionKeyDefinition.Paths.Count)
{
throw new ArgumentException("Number of components in the partition key value does not match the definition.");
}
}

string pkParamName = "@param_pk" + i;
string idParamName = "@param_id" + i;
sqlParameters.Add(new SqlParameter(idParamName, items[i].Item1));

queryStringBuilder.Append("( ");
queryStringBuilder.Append("c.id = ");
queryStringBuilder.Append(idParamName);
for (int j = 0; j < this.partitionKeySelectors.Count; j++)
for (int j = 0; j < pkValues.Length; j++)
{
queryStringBuilder.Append(" AND ");
queryStringBuilder.Append("c");
Expand Down Expand Up @@ -365,6 +383,19 @@ private void CancelCancellationToken(CancellationToken cancellationToken)
}
}

private ValueTask<Documents.Routing.PartitionKeyInternal> GetPartitionKeyInternalAsync(PartitionKey partitionKey,
ITrace trace,
CancellationToken cancellationToken)
{
if (partitionKey.IsNone)
{
return new ValueTask<Documents.Routing.PartitionKeyInternal>(
this.container.GetNonePartitionKeyValueAsync(trace, cancellationToken));
}
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved

return new ValueTask<Documents.Routing.PartitionKeyInternal>(partitionKey.InternalKey);
}

private class ReadManyFeedResponseEnumerable<T> : IEnumerable<T>
{
private readonly List<FeedResponse<T>> typedResponses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,17 @@ public async Task<ResponseMessage> ReadManyItemsStreamAsync(
throw new ArgumentNullException(nameof(trace));
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(await this.GetPartitionKeyDefinitionAsync(),
PartitionKeyDefinition partitionKeyDefinition = null;
try
{
partitionKeyDefinition = await this.GetPartitionKeyDefinitionAsync();
}
catch (CosmosException ex)
{
ex.ToCosmosResponseMessage(request: null);
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(partitionKeyDefinition,
this);

return await readManyHelper.ExecuteReadManyRequestAsync(items,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,35 @@ await container.CreateItemAsync(
await database.DeleteAsync();
}

[TestMethod]
public async Task ReadManyItemsFromNonPartitionedContainers()
{
ContainerInternal container = await NonPartitionedContainerHelper.CreateNonPartitionedContainer(this.database,
Guid.NewGuid().ToString());
for (int i = 0; i < 5; i++)
{
await NonPartitionedContainerHelper.CreateItemInNonPartitionedContainer(container, "id" + i.ToString());
}

// read using PartitionKey.None pk value
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 10; i++)
{
itemList.Add(("id" + i.ToString(), PartitionKey.None));
}

using (ResponseMessage responseMessage = await container.ReadManyItemsStreamAsync(itemList))
{
Assert.IsNotNull(responseMessage);
Assert.IsTrue(responseMessage.Headers.RequestCharge > 0);
Assert.IsNotNull(responseMessage.Diagnostics);

ToDoActivity[] items = this.cosmosClient.ClientContext.SerializerCore.FromFeedStream<ToDoActivity>(
CosmosFeedResponseSerializer.GetStreamWithoutServiceEnvelope(responseMessage.Content));
Assert.AreEqual(items.Length, 5);
}
}

[TestMethod]
[DataRow(HttpStatusCode.NotFound)]
public async Task ReadManyExceptionsTest(HttpStatusCode statusCode)
Expand Down