Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ToStreamIterator for LINQ #604

Merged
merged 7 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,34 @@ public static FeedIterator<T> ToFeedIterator<T>(this IQueryable<T> query)

return linqQuery.ToFeedIterator();
}

/// <summary>
/// This extension method gets the FeedIterator from LINQ IQueryable to execute query asynchronously.
/// This will create the fresh new FeedIterator when called.
/// </summary>
/// <typeparam name="T">the type of object to query.</typeparam>
/// <param name="query">the IQueryable{T} to be converted.</param>
/// <returns>An iterator to go through the items.</returns>
/// <example>
/// This example shows how to get FeedIterator from LINQ.
///
/// <code language="c#">
/// <![CDATA[
/// IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>();
/// FeedIterator setIterator = linqQueryable.Where(item => (item.taskNum < 100)).ToFeedIterator()
/// ]]>
/// </code>
/// </example>
public static FeedIterator ToStreamIterator<T>(this IQueryable<T> query)
{
CosmosLinqQuery<T> linqQuery = query as CosmosLinqQuery<T>;

if (linqQuery == null)
{
throw new ArgumentOutOfRangeException(nameof(linqQuery), "ToStreamFeedIterator is only supported on cosmos LINQ query operations");
}

return linqQuery.ToStreamIterator();
}
}
}
15 changes: 13 additions & 2 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public FeedIterator<T> ToFeedIterator()
return this.CreateFeedIterator(true);
}

public FeedIterator ToStreamIterator()
{
return this.CreateStreamIterator(true);
}

public void Dispose()
{
//NOTHING TO DISPOSE HERE
Expand All @@ -158,16 +163,22 @@ Task<DocumentFeedResponse<dynamic>> IDocumentQuery<T>.ExecuteNextAsync(Cancellat
throw new NotImplementedException();
}

private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
private FeedIterator CreateStreamIterator(bool isContinuationExcpected)
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.container.GetItemQueryStreamIteratorInternal(
return this.container.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: querySpec,
isContinuationExcpected: isContinuationExcpected,
continuationToken: this.continuationToken,
requestOptions: this.cosmosQueryRequestOptions);
}

private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.CreateStreamIterator(isContinuationExcpected);
return new FeedIteratorCore<T>(
streamIterator,
this.responseFactory.CreateQueryFeedResponse<T>);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
{
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Dynamic;
using System.Threading.Tasks;

[TestClass]
public class CosmosItemLinqTests : BaseCosmosClientHelper
{
private Container Container = null;
private ContainerProperties containerSettings = null;

[TestInitialize]
public async Task TestInitialize()
{
await base.TestInit();
string PartitionKey = "/status";
this.containerSettings = new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey);
ContainerResponse response = await this.database.CreateContainerAsync(
this.containerSettings,
cancellationToken: this.cancellationToken);
Assert.IsNotNull(response);
Assert.IsNotNull(response.Container);
Assert.IsNotNull(response.Resource);
this.Container = response;
}

[TestCleanup]
public async Task Cleanup()
{
await base.TestCleanup();
}

[DataRow(false)]
[DataRow(true)]
[DataTestMethod]
public async Task ItemLinqReadFeedTest(bool useStatelessIterator)
{
IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(this.Container, pkCount: 3, randomPartitionKey: true);
HashSet<string> itemIds = deleteList.Select(x => x.id).ToHashSet<string>();

QueryRequestOptions requestOptions = new QueryRequestOptions()
{
MaxItemCount = 1
};

List<ToDoActivity> itemsViaReadFeed = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: requestOptions).ToList();

Assert.IsTrue(itemsViaReadFeed.Count >= 3);
CollectionAssert.AreEqual(deleteList.ToList(), itemsViaReadFeed);

string lastContinuationToken = null;
FeedIterator<ToDoActivity> feedIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
requestOptions: requestOptions).ToFeedIterator();

while (feedIterator.HasMoreResults)
{
if (useStatelessIterator)
{
feedIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
continuationToken: lastContinuationToken,
requestOptions: requestOptions).ToFeedIterator();
}

var responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken);
lastContinuationToken = responseMessage.ContinuationToken;

foreach (ToDoActivity toDoActivity in responseMessage)
{
if (itemIds.Contains(toDoActivity.id))
{
itemIds.Remove(toDoActivity.id);
}
}
}

Assert.IsNull(lastContinuationToken);
Assert.AreEqual(itemIds.Count, 0);

itemIds = deleteList.Select(x => x.id).ToHashSet<string>();
FeedIterator streamIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
requestOptions: requestOptions).ToStreamIterator();

while (streamIterator.HasMoreResults)
{
if (useStatelessIterator)
{
streamIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
continuationToken: lastContinuationToken,
requestOptions: requestOptions).ToStreamIterator();
}

using (var responseMessage = await streamIterator.ReadNextAsync(this.cancellationToken))
{
lastContinuationToken = responseMessage.Headers.ContinuationToken;

var items = TestCommon.Serializer.FromStream<CosmosFeedResponseUtil<ToDoActivity>>(responseMessage.Content).Data;
foreach (ToDoActivity toDoActivity in items)
{
if (itemIds.Contains(toDoActivity.id))
{
itemIds.Remove(toDoActivity.id);
}
}
}
}

Assert.IsNull(lastContinuationToken);
Assert.AreEqual(itemIds.Count, 0);
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
[ExpectedException(typeof(ArgumentOutOfRangeException))]
public void LinqQueryToIteratorBlockTest(bool isStreamIterator)
{
//Checking for exception in case of ToFeedIterator() use on non cosmos linq IQueryable.
IQueryable<ToDoActivity> nonLinqQueryable = (new List<ToDoActivity> { ToDoActivity.CreateRandomToDoActivity() }).AsQueryable();
if (isStreamIterator)
{
nonLinqQueryable.ToStreamIterator();
}
else
{
nonLinqQueryable.ToFeedIterator();
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
[ExpectedException(typeof(NotSupportedException))]
public void LinqQuerySyncBlockTest(bool isReadFeed)
{
//Checking for exception in case of ToFeedIterator() use on non cosmos linq IQueryable.
if (isReadFeed)
{
this.Container.GetItemLinqQueryable<ToDoActivity>().ToList();
}
else
{
this.Container.GetItemLinqQueryable<ToDoActivity>().Where(item => item.cost > 0).ToList();
}
}

[TestMethod]
public async Task ItemLINQQueryTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await ToDoActivity.CreateRandomItems(container: this.Container, pkCount: 2, perPKItemCount: 1, randomPartitionKey: true);

IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>();
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
//V3 Asynchronous query execution with LINQ query generation sql text.
FeedIterator<ToDoActivity> setIterator = this.Container.GetItemQueryIterator<ToDoActivity>(
queriable.ToQueryDefinition(),
requestOptions: new QueryRequestOptions() { MaxConcurrency = 2 });

int resultsFetched = 0;
while (setIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> queryResponse = await setIterator.ReadNextAsync();
resultsFetched += queryResponse.Count();

// For the items returned with NonePartitionKeyValue
IEnumerator<ToDoActivity> iter = queryResponse.GetEnumerator();
while (iter.MoveNext())
{
ToDoActivity activity = iter.Current;
Assert.AreEqual(42, activity.taskNum);
}
Assert.AreEqual(2, resultsFetched);
}

//LINQ query execution without partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution: true);
queriable = linqQueryable.Where(item => (item.taskNum < 100));

Assert.AreEqual(2, queriable.Count());
Assert.AreEqual(itemList[0].id, queriable.ToList()[0].id);
Assert.AreEqual(itemList[1].id, queriable.ToList()[1].id);

//LINQ query execution with wrong partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: new QueryRequestOptions() { PartitionKey = new Cosmos.PartitionKey("test") });
queriable = linqQueryable.Where(item => (item.taskNum < 100));
Assert.AreEqual(0, queriable.Count());

//LINQ query execution with correct partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: new QueryRequestOptions { ConsistencyLevel = Cosmos.ConsistencyLevel.Eventual, PartitionKey = new Cosmos.PartitionKey(itemList[1].status) });
queriable = linqQueryable.Where(item => (item.taskNum < 100));
Assert.AreEqual(1, queriable.Count());
Assert.AreEqual(itemList[1].id, queriable.ToList()[0].id);
}

[TestMethod]
public async Task ItemLINQQueryWithContinuationTokenTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await ToDoActivity.CreateRandomItems(container: this.Container, pkCount: 10, perPKItemCount: 1, randomPartitionKey: true);

QueryRequestOptions queryRequestOptions = new QueryRequestOptions();
queryRequestOptions.MaxConcurrency = 1;
queryRequestOptions.MaxItemCount = 5;
IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(requestOptions: queryRequestOptions);
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
FeedIterator<ToDoActivity> feedIterator = queriable.ToFeedIterator();

int firstItemSet = 0;
string continuationToken = null;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
firstItemSet = feedResponse.Count();
continuationToken = feedResponse.ContinuationToken;
if (firstItemSet > 0)
{
break;
}
}

linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(continuationToken: continuationToken, requestOptions: queryRequestOptions);
queriable = linqQueryable.Where(item => (item.taskNum < 100));
feedIterator = queriable.ToFeedIterator();

//Test continuationToken with LINQ query generation and asynchronous feedIterator execution.
int secondItemSet = 0;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
secondItemSet += feedResponse.Count();
}

Assert.AreEqual(10 - firstItemSet, secondItemSet);

//Test continuationToken with blocking LINQ execution
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution: true, continuationToken: continuationToken, requestOptions: queryRequestOptions);
int linqExecutionItemCount = linqQueryable.Where(item => (item.taskNum < 100)).Count();
Assert.AreEqual(10 - firstItemSet, linqExecutionItemCount);
}
}
}
Loading