Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadMany: Adds ReadManyItems Api using Query under the covers #2352

Merged
merged 21 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ namespace Microsoft.Azure.Cosmos

internal class ReadFeedResponse<T> : FeedResponse<T>
{
protected ReadFeedResponse(
internal ReadFeedResponse(
HttpStatusCode httpStatusCode,
IReadOnlyCollection<T> resources,
IEnumerable<T> resources,
int resourceCount,
Headers responseMessageHeaders,
CosmosDiagnostics diagnostics)
{
this.Count = resources?.Count ?? 0;
this.Count = resourceCount;
this.Headers = responseMessageHeaders;
this.StatusCode = httpStatusCode;
this.Diagnostics = diagnostics;
Expand Down Expand Up @@ -53,6 +54,7 @@ internal static ReadFeedResponse<TInput> CreateResponse<TInput>(
ReadFeedResponse<TInput> readFeedResponse = new ReadFeedResponse<TInput>(
httpStatusCode: responseMessage.StatusCode,
resources: resources,
resourceCount: resources.Count,
responseMessageHeaders: responseMessage.Headers,
diagnostics: responseMessage.Diagnostics);

Expand Down
25 changes: 25 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadManyHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;

internal abstract class ReadManyHelper
{
public abstract Task<ResponseMessage> ExecuteReadManyRequestAsync(IReadOnlyList<(string, PartitionKey)> items,
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
ReadManyRequestOptions readManyRequestOptions,
ITrace trace,
CancellationToken cancellationToken);

public abstract Task<FeedResponse<T>> ExecuteReadManyRequestAsync<T>(IReadOnlyList<(string, PartitionKey)> items,
ReadManyRequestOptions readManyRequestOptions,
ITrace trace,
CancellationToken cancellationToken);
}
}
403 changes: 403 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadManyQueryHelper.cs

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions Microsoft.Azure.Cosmos/src/ReadManyRequestOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
/// <summary>
/// The Cosmos query request options
/// </summary>
public class ReadManyRequestOptions : RequestOptions
{
/// <summary>
/// Gets or sets the consistency level required for the request in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The consistency level required for the request.
/// </value>
/// <remarks>
/// Azure Cosmos DB offers 5 different consistency levels. Strong, Bounded Staleness, Session, Consistent Prefix and Eventual - in order of strongest to weakest consistency. <see cref="ConnectionPolicy"/>
/// <para>
/// While this is set at a database account level, Azure Cosmos DB allows a developer to override the default consistency level
/// for each individual request.
/// </para>
/// </remarks>
public ConsistencyLevel? ConsistencyLevel
{
get => this.BaseConsistencyLevel;
set => this.BaseConsistencyLevel = value;
}

/// <summary>
/// Gets or sets the token for use with session consistency in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The token for use with session consistency.
/// </value>
///
/// <remarks>
/// One of the <see cref="ConsistencyLevel"/> for Azure Cosmos DB is Session. In fact, this is the default level applied to accounts.
/// <para>
/// When working with Session consistency, each new write request to Azure Cosmos DB is assigned a new SessionToken.
/// The CosmosClient will use this token internally with each read/query request to ensure that the set consistency level is maintained.
///
/// <para>
/// In some scenarios you need to manage this Session yourself;
/// Consider a web application with multiple nodes, each node will have its own instance of <see cref="CosmosClient"/>
/// If you wanted these nodes to participate in the same session (to be able read your own writes consistently across web tiers)
/// you would have to send the SessionToken from <see cref="FeedResponse{T}"/> of the write action on one node
/// to the client tier, using a cookie or some other mechanism, and have that token flow back to the web tier for subsequent reads.
/// If you are using a round-robin load balancer which does not maintain session affinity between requests, such as the Azure Load Balancer,
/// the read could potentially land on a different node to the write request, where the session was created.
/// </para>
///
/// <para>
/// If you do not flow the Azure Cosmos DB SessionToken across as described above you could end up with inconsistent read results for a period of time.
/// </para>
///
/// </para>
/// </remarks>
public string SessionToken { get; set; }

internal QueryRequestOptions ConvertToQueryRequestOptions()
{
return new QueryRequestOptions
{
ConsistencyLevel = this.ConsistencyLevel,
SessionToken = this.SessionToken,
IfMatchEtag = this.IfMatchEtag,
IfNoneMatchEtag = this.IfNoneMatchEtag,
Properties = this.Properties
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public ConsistencyLevel? ConsistencyLevel
/// One of the <see cref="ConsistencyLevel"/> for Azure Cosmos DB is Session. In fact, this is the default level applied to accounts.
/// <para>
/// When working with Session consistency, each new write request to Azure Cosmos DB is assigned a new SessionToken.
/// The DocumentClient will use this token internally with each read/query request to ensure that the set consistency level is maintained.
/// The CosmosClient will use this token internally with each read/query request to ensure that the set consistency level is maintained.
///
/// <para>
/// In some scenarios you need to manage this Session yourself;
/// Consider a web application with multiple nodes, each node will have its own instance of <see cref="DocumentClient"/>
/// Consider a web application with multiple nodes, each node will have its own instance of <see cref="CosmosClient"/>
/// If you wanted these nodes to participate in the same session (to be able read your own writes consistently across web tiers)
/// you would have to send the SessionToken from <see cref="QueryResponse{T}"/> of the write action on one node
/// you would have to send the SessionToken from <see cref="FeedResponse{T}"/> of the write action on one node
/// to the client tier, using a cookie or some other mechanism, and have that token flow back to the web tier for subsequent reads.
/// If you are using a round-robin load balancer which does not maintain session affinity between requests, such as the Azure Load Balancer,
/// the read could potentially land on a different node to the write request, where the session was created.
Expand Down
71 changes: 71 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,77 @@ public abstract Task<ItemResponse<T>> ReplaceItemAsync<T>(
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Reads multiple items from a container using Id and PartitionKey values.
/// </summary>
/// <param name="items">List of item.Id and <see cref="PartitionKey"/></param>
/// <param name="readManyRequestOptions">Request Options for ReadMany Operation</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>
/// A <see cref="Task"/> containing a <see cref="ResponseMessage"/> which wraps a <see cref="Stream"/> containing the response.
/// </returns>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// IReadOnlyList<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>
/// {
/// ("Id1", new PartitionKey("pkValue1")),
/// ("Id2", new PartitionKey("pkValue2")),
/// ("Id3", new PartitionKey("pkValue3"))
/// };
///
/// using (ResponseMessage responseMessage = await this.Container.ReadManyItemsStreamAsync(itemList))
/// {
/// using (Stream stream = response.ReadBodyAsync())
/// {
/// //Read or do other operations with the stream
/// using (StreamReader streamReader = new StreamReader(stream))
/// {
/// string content = streamReader.ReadToEndAsync();
/// }
/// }
/// }
/// ]]>
/// </code>
/// </example>
public abstract Task<ResponseMessage> ReadManyItemsStreamAsync(
IReadOnlyList<(string, PartitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Reads multiple items from a container using Id and PartitionKey values.
/// </summary>
/// <param name="items">List of item.Id and <see cref="PartitionKey"/></param>
/// <param name="readManyRequestOptions">Request Options for ReadMany Operation</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>
/// A <see cref="Task"/> containing a <see cref="FeedResponse{T}"/> which wraps the typed items.
/// </returns>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// public class ToDoActivity{
/// public string id {get; set;}
/// public string status {get; set;}
/// }
///
/// IReadOnlyList<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>
/// {
/// ("Id1", new PartitionKey("pkValue1")),
/// ("Id2", new PartitionKey("pkValue2")),
/// ("Id3", new PartitionKey("pkValue3"))
/// };
///
/// FeedResponse<ToDoActivity> feedResponse = this.Container.ReadManyItemsAsync<ToDoActivity>(itemList);
/// ]]>
/// </code>
/// </example>
public abstract Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string, PartitionKey)> items,
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default);

#if PREVIEW
/// <summary>
/// Patches an item in the Azure Cosmos service as an asynchronous operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Linq;
Expand All @@ -27,9 +29,11 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;

/// <summary>
/// Used to perform operations on items. There are two different types of operations.
Expand Down Expand Up @@ -278,6 +282,56 @@ public override FeedIterator GetItemQueryStreamIterator(
requestOptions: requestOptions);
}

public async Task<ResponseMessage> ReadManyItemsStreamAsync(
IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
if (items == null)
{
throw new ArgumentNullException(nameof(items));
}

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(await this.GetPartitionKeyDefinitionAsync(),
this);

return await readManyHelper.ExecuteReadManyRequestAsync(items,
readManyRequestOptions,
trace,
cancellationToken);
}

public async Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string, PartitionKey)> items,
ITrace trace,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
if (items == null)
{
throw new ArgumentNullException(nameof(items));
}

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

ReadManyHelper readManyHelper = new ReadManyQueryHelper(await this.GetPartitionKeyDefinitionAsync(),
this);

return await readManyHelper.ExecuteReadManyRequestAsync<T>(items,
readManyRequestOptions,
trace,
cancellationToken);
}

/// <summary>
/// Used in the compute gateway to support legacy gateway interface.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,28 @@ public override Task<ItemResponse<T>> PatchItemAsync<T>(
(trace) => base.PatchItemAsync<T>(id, partitionKey, patchOperations, trace, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReadManyItemsStreamAsync(
IReadOnlyList<(string, PartitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
return this.ClientContext.OperationHelperAsync(
nameof(ReadManyItemsStreamAsync),
null,
(trace) => base.ReadManyItemsStreamAsync(items, trace, readManyRequestOptions, cancellationToken));
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string, PartitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
return this.ClientContext.OperationHelperAsync(
nameof(ReadManyItemsAsync),
null,
(trace) => base.ReadManyItemsAsync<T>(items, trace, readManyRequestOptions, cancellationToken));
}

public override FeedIterator GetItemQueryStreamIterator(
QueryDefinition queryDefinition,
string continuationToken = null,
Expand Down
Loading