Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding continuation token support for LINQ #544

Merged
merged 4 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,30 @@ internal sealed class CosmosLinqQuery<T> : IDocumentQuery<T>, IOrderedQueryable<
private readonly CosmosSerializer cosmosJsonSerializer;
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution = false;
private readonly string continuationToken;

public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
Expression expression,
bool allowSynchronousQueryExecution)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosJsonSerializer = cosmosJsonSerializer ?? throw new ArgumentNullException(nameof(cosmosJsonSerializer));
this.queryClient = queryClient ?? throw new ArgumentNullException(nameof(queryClient));
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.expression = expression ?? Expression.Constant(this);
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.queryProvider = new CosmosLinqQueryProvider(
container,
cosmosJsonSerializer,
queryClient,
cosmosQueryRequestOptions,
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution,
this.queryClient.OnExecuteScalarQueryCallback);
this.correlatedActivityId = Guid.NewGuid();
Expand All @@ -59,12 +63,14 @@ public CosmosLinqQuery(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution)
: this(
container,
cosmosJsonSerializer,
queryClient,
continuationToken,
cosmosQueryRequestOptions,
null,
allowSynchronousQueryExecution)
Expand Down Expand Up @@ -144,7 +150,7 @@ public FeedIterator<T> ToFeedIterator()
{
return this.container.GetItemQueryIterator<T>(
queryDefinition: new QueryDefinition(this.ToSqlQueryText()),
continuationToken: null,
continuationToken: this.continuationToken,
simplynaveen20 marked this conversation as resolved.
Show resolved Hide resolved
requestOptions: this.cosmosQueryRequestOptions);
}

Expand All @@ -171,7 +177,7 @@ private FeedIterator CreateCosmosQueryExecutionContext()
operationType: OperationType.Query,
resourceType: typeof(T),
sqlQuerySpec: DocumentQueryEvaluator.Evaluate(this.expression),
continuationToken: null,
continuationToken: this.continuationToken,
queryRequestOptions: this.cosmosQueryRequestOptions,
resourceLink: this.container.LinkUri,
isContinuationExpected: false,
Expand Down
7 changes: 7 additions & 0 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ internal sealed class CosmosLinqQueryProvider : IQueryProvider
private readonly QueryRequestOptions cosmosQueryRequestOptions;
private readonly bool allowSynchronousQueryExecution;
private readonly Action<IQueryable> onExecuteScalarQueryCallback;
private readonly string continuationToken;

public CosmosLinqQueryProvider(
ContainerCore container,
CosmosSerializer cosmosJsonSerializer,
CosmosQueryClientCore queryClient,
string continuationToken,
QueryRequestOptions cosmosQueryRequestOptions,
bool allowSynchronousQueryExecution,
Action<IQueryable> onExecuteScalarQueryCallback = null)
{
this.container = container;
this.cosmosJsonSerializer = cosmosJsonSerializer;
this.queryClient = queryClient;
this.continuationToken = continuationToken;
this.cosmosQueryRequestOptions = cosmosQueryRequestOptions;
this.allowSynchronousQueryExecution = allowSynchronousQueryExecution;
this.onExecuteScalarQueryCallback = onExecuteScalarQueryCallback;
Expand All @@ -43,6 +46,7 @@ public IQueryable<TElement> CreateQuery<TElement>(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -57,6 +61,7 @@ public IQueryable CreateQuery(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -70,6 +75,7 @@ public TResult Execute<TResult>(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
expression,
this.allowSynchronousQueryExecution);
Expand All @@ -86,6 +92,7 @@ public object Execute(Expression expression)
this.container,
this.cosmosJsonSerializer,
this.queryClient,
this.continuationToken,
this.cosmosQueryRequestOptions,
this.allowSynchronousQueryExecution);
this.onExecuteScalarQueryCallback?.Invoke(cosmosLINQQuery);
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ public abstract FeedIterator<T> GetItemQueryIterator<T>(
/// </remarks>
/// <typeparam name="T">The type of object to query.</typeparam>
/// <param name="allowSynchronousQueryExecution">(Optional)the option which allows the query to be executed synchronously via IOrderedQueryable.</param>
/// <param name="continuationToken">(Optional) The continuation token in the Azure Cosmos DB service.</param>
/// <param name="requestOptions">(Optional)The options for the item query request.<see cref="QueryRequestOptions"/></param>
/// <returns>(Optional) An IOrderedQueryable{T} that can evaluate the query.</returns>
/// <example>
Expand Down Expand Up @@ -1053,6 +1054,7 @@ public abstract FeedIterator<T> GetItemQueryIterator<T>(
/// </remarks>
public abstract IOrderedQueryable<T> GetItemLinqQueryable<T>(
bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public override FeedIterator<T> GetItemQueryIterator<T>(

public override IOrderedQueryable<T> GetItemLinqQueryable<T>(
bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
requestOptions = requestOptions != null ? requestOptions : new QueryRequestOptions();
Expand All @@ -322,6 +323,7 @@ public override IOrderedQueryable<T> GetItemLinqQueryable<T>(
this,
this.ClientContext.CosmosSerializer,
(CosmosQueryClientCore)this.queryClient,
continuationToken,
requestOptions,
allowSynchronousQueryExecution);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,53 @@ public async Task ItemLINQQueryTest()
Assert.IsTrue(exception.Message.Contains("To execute LINQ query please set allowSynchronousQueryExecution true"));
}
}

[TestMethod]
public async Task ItemLINQQueryWithContinuationTokenTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await CreateRandomItems(pkCount: 10, perPKItemCount: 1, randomPartitionKey: true);

QueryRequestOptions queryRequestOptions = new QueryRequestOptions();
queryRequestOptions.MaxConcurrency = 1;
queryRequestOptions.MaxItemCount = 5;
IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(requestOptions: queryRequestOptions);
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
FeedIterator<ToDoActivity> feedIterator = queriable.ToFeedIterator();

int firstItemSet = 0;
string continuationToken = null;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
firstItemSet = feedResponse.Count();
continuationToken = feedResponse.ContinuationToken;
if(firstItemSet > 0)
{
break;
}
}

linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(continuationToken: continuationToken, requestOptions: queryRequestOptions);
queriable = linqQueryable.Where(item => (item.taskNum < 100));
feedIterator = queriable.ToFeedIterator();

//Test continuationToken with LINQ query generation and asynchronous feedIterator execution.
int secondItemSet = 0;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
secondItemSet += feedResponse.Count();
}

Assert.AreEqual(10 - firstItemSet, secondItemSet);

//Test continuationToken with blocking LINQ execution
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution:true, continuationToken: continuationToken, requestOptions: queryRequestOptions);
int linqExecutionItemCount = linqQueryable.Where(item => (item.taskNum < 100)).Count();
Assert.AreEqual(10 - firstItemSet, linqExecutionItemCount);
}

// Move the data from None Partition to other logical partitions
[TestMethod]
public async Task MigrateDataInNonPartitionContainer()
Expand Down