Skip to content

Commit

Permalink
Adding ToStreamIterator for LINQ (#604)
Browse files Browse the repository at this point in the history
* Adding ToStreamIterator for LINQ

* Updated changelog and contract enforcement

* Updating change log
  • Loading branch information
j82w authored and kirankumarkolli committed Jul 29, 2019
1 parent 7ad7f85 commit bb72ba5
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 282 deletions.
29 changes: 29 additions & 0 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,34 @@ public static FeedIterator<T> ToFeedIterator<T>(this IQueryable<T> query)

return linqQuery.ToFeedIterator();
}

/// <summary>
/// This extension method gets the FeedIterator from LINQ IQueryable to execute query asynchronously.
/// This will create the fresh new FeedIterator when called.
/// </summary>
/// <typeparam name="T">the type of object to query.</typeparam>
/// <param name="query">the IQueryable{T} to be converted.</param>
/// <returns>An iterator to go through the items.</returns>
/// <example>
/// This example shows how to get FeedIterator from LINQ.
///
/// <code language="c#">
/// <![CDATA[
/// IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>();
/// FeedIterator setIterator = linqQueryable.Where(item => (item.taskNum < 100)).ToFeedIterator()
/// ]]>
/// </code>
/// </example>
public static FeedIterator ToStreamIterator<T>(this IQueryable<T> query)
{
CosmosLinqQuery<T> linqQuery = query as CosmosLinqQuery<T>;

if (linqQuery == null)
{
throw new ArgumentOutOfRangeException(nameof(linqQuery), "ToStreamFeedIterator is only supported on cosmos LINQ query operations");
}

return linqQuery.ToStreamIterator();
}
}
}
15 changes: 13 additions & 2 deletions Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public FeedIterator<T> ToFeedIterator()
return this.CreateFeedIterator(true);
}

public FeedIterator ToStreamIterator()
{
return this.CreateStreamIterator(true);
}

public void Dispose()
{
//NOTHING TO DISPOSE HERE
Expand All @@ -158,16 +163,22 @@ Task<DocumentFeedResponse<dynamic>> IDocumentQuery<T>.ExecuteNextAsync(Cancellat
throw new NotImplementedException();
}

private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
private FeedIterator CreateStreamIterator(bool isContinuationExcpected)
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.container.GetItemQueryStreamIteratorInternal(
return this.container.GetItemQueryStreamIteratorInternal(
sqlQuerySpec: querySpec,
isContinuationExcpected: isContinuationExcpected,
continuationToken: this.continuationToken,
requestOptions: this.cosmosQueryRequestOptions);
}

private FeedIterator<T> CreateFeedIterator(bool isContinuationExcpected)
{
SqlQuerySpec querySpec = DocumentQueryEvaluator.Evaluate(this.Expression);

FeedIterator streamIterator = this.CreateStreamIterator(isContinuationExcpected);
return new FeedIteratorCore<T>(
streamIterator,
this.responseFactory.CreateQueryFeedResponse<T>);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
{
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Dynamic;
using System.Threading.Tasks;

[TestClass]
public class CosmosItemLinqTests : BaseCosmosClientHelper
{
private Container Container = null;
private ContainerProperties containerSettings = null;

[TestInitialize]
public async Task TestInitialize()
{
await base.TestInit();
string PartitionKey = "/status";
this.containerSettings = new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey);
ContainerResponse response = await this.database.CreateContainerAsync(
this.containerSettings,
cancellationToken: this.cancellationToken);
Assert.IsNotNull(response);
Assert.IsNotNull(response.Container);
Assert.IsNotNull(response.Resource);
this.Container = response;
}

[TestCleanup]
public async Task Cleanup()
{
await base.TestCleanup();
}

[DataRow(false)]
[DataRow(true)]
[DataTestMethod]
public async Task ItemLinqReadFeedTest(bool useStatelessIterator)
{
IList<ToDoActivity> deleteList = await ToDoActivity.CreateRandomItems(this.Container, pkCount: 3, randomPartitionKey: true);
HashSet<string> itemIds = deleteList.Select(x => x.id).ToHashSet<string>();

QueryRequestOptions requestOptions = new QueryRequestOptions()
{
MaxItemCount = 1
};

List<ToDoActivity> itemsViaReadFeed = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: requestOptions).ToList();

Assert.IsTrue(itemsViaReadFeed.Count >= 3);
CollectionAssert.AreEqual(deleteList.ToList(), itemsViaReadFeed);

string lastContinuationToken = null;
FeedIterator<ToDoActivity> feedIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
requestOptions: requestOptions).ToFeedIterator();

while (feedIterator.HasMoreResults)
{
if (useStatelessIterator)
{
feedIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
continuationToken: lastContinuationToken,
requestOptions: requestOptions).ToFeedIterator();
}

var responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken);
lastContinuationToken = responseMessage.ContinuationToken;

foreach (ToDoActivity toDoActivity in responseMessage)
{
if (itemIds.Contains(toDoActivity.id))
{
itemIds.Remove(toDoActivity.id);
}
}
}

Assert.IsNull(lastContinuationToken);
Assert.AreEqual(itemIds.Count, 0);

itemIds = deleteList.Select(x => x.id).ToHashSet<string>();
FeedIterator streamIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
requestOptions: requestOptions).ToStreamIterator();

while (streamIterator.HasMoreResults)
{
if (useStatelessIterator)
{
streamIterator = this.Container.GetItemLinqQueryable<ToDoActivity>(
continuationToken: lastContinuationToken,
requestOptions: requestOptions).ToStreamIterator();
}

using (var responseMessage = await streamIterator.ReadNextAsync(this.cancellationToken))
{
lastContinuationToken = responseMessage.Headers.ContinuationToken;

var items = TestCommon.Serializer.FromStream<CosmosFeedResponseUtil<ToDoActivity>>(responseMessage.Content).Data;
foreach (ToDoActivity toDoActivity in items)
{
if (itemIds.Contains(toDoActivity.id))
{
itemIds.Remove(toDoActivity.id);
}
}
}
}

Assert.IsNull(lastContinuationToken);
Assert.AreEqual(itemIds.Count, 0);
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
[ExpectedException(typeof(ArgumentOutOfRangeException))]
public void LinqQueryToIteratorBlockTest(bool isStreamIterator)
{
//Checking for exception in case of ToFeedIterator() use on non cosmos linq IQueryable.
IQueryable<ToDoActivity> nonLinqQueryable = (new List<ToDoActivity> { ToDoActivity.CreateRandomToDoActivity() }).AsQueryable();
if (isStreamIterator)
{
nonLinqQueryable.ToStreamIterator();
}
else
{
nonLinqQueryable.ToFeedIterator();
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
[ExpectedException(typeof(NotSupportedException))]
public void LinqQuerySyncBlockTest(bool isReadFeed)
{
//Checking for exception in case of ToFeedIterator() use on non cosmos linq IQueryable.
if (isReadFeed)
{
this.Container.GetItemLinqQueryable<ToDoActivity>().ToList();
}
else
{
this.Container.GetItemLinqQueryable<ToDoActivity>().Where(item => item.cost > 0).ToList();
}
}

[TestMethod]
public async Task ItemLINQQueryTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await ToDoActivity.CreateRandomItems(container: this.Container, pkCount: 2, perPKItemCount: 1, randomPartitionKey: true);

IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>();
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
//V3 Asynchronous query execution with LINQ query generation sql text.
FeedIterator<ToDoActivity> setIterator = this.Container.GetItemQueryIterator<ToDoActivity>(
queriable.ToQueryDefinition(),
requestOptions: new QueryRequestOptions() { MaxConcurrency = 2 });

int resultsFetched = 0;
while (setIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> queryResponse = await setIterator.ReadNextAsync();
resultsFetched += queryResponse.Count();

// For the items returned with NonePartitionKeyValue
IEnumerator<ToDoActivity> iter = queryResponse.GetEnumerator();
while (iter.MoveNext())
{
ToDoActivity activity = iter.Current;
Assert.AreEqual(42, activity.taskNum);
}
Assert.AreEqual(2, resultsFetched);
}

//LINQ query execution without partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution: true);
queriable = linqQueryable.Where(item => (item.taskNum < 100));

Assert.AreEqual(2, queriable.Count());
Assert.AreEqual(itemList[0].id, queriable.ToList()[0].id);
Assert.AreEqual(itemList[1].id, queriable.ToList()[1].id);

//LINQ query execution with wrong partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: new QueryRequestOptions() { PartitionKey = new Cosmos.PartitionKey("test") });
queriable = linqQueryable.Where(item => (item.taskNum < 100));
Assert.AreEqual(0, queriable.Count());

//LINQ query execution with correct partition key.
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(
allowSynchronousQueryExecution: true,
requestOptions: new QueryRequestOptions { ConsistencyLevel = Cosmos.ConsistencyLevel.Eventual, PartitionKey = new Cosmos.PartitionKey(itemList[1].status) });
queriable = linqQueryable.Where(item => (item.taskNum < 100));
Assert.AreEqual(1, queriable.Count());
Assert.AreEqual(itemList[1].id, queriable.ToList()[0].id);
}

[TestMethod]
public async Task ItemLINQQueryWithContinuationTokenTest()
{
//Creating items for query.
IList<ToDoActivity> itemList = await ToDoActivity.CreateRandomItems(container: this.Container, pkCount: 10, perPKItemCount: 1, randomPartitionKey: true);

QueryRequestOptions queryRequestOptions = new QueryRequestOptions();
queryRequestOptions.MaxConcurrency = 1;
queryRequestOptions.MaxItemCount = 5;
IOrderedQueryable<ToDoActivity> linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(requestOptions: queryRequestOptions);
IQueryable<ToDoActivity> queriable = linqQueryable.Where(item => (item.taskNum < 100));
FeedIterator<ToDoActivity> feedIterator = queriable.ToFeedIterator();

int firstItemSet = 0;
string continuationToken = null;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
firstItemSet = feedResponse.Count();
continuationToken = feedResponse.ContinuationToken;
if (firstItemSet > 0)
{
break;
}
}

linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(continuationToken: continuationToken, requestOptions: queryRequestOptions);
queriable = linqQueryable.Where(item => (item.taskNum < 100));
feedIterator = queriable.ToFeedIterator();

//Test continuationToken with LINQ query generation and asynchronous feedIterator execution.
int secondItemSet = 0;
while (feedIterator.HasMoreResults)
{
FeedResponse<ToDoActivity> feedResponse = await feedIterator.ReadNextAsync();
secondItemSet += feedResponse.Count();
}

Assert.AreEqual(10 - firstItemSet, secondItemSet);

//Test continuationToken with blocking LINQ execution
linqQueryable = this.Container.GetItemLinqQueryable<ToDoActivity>(allowSynchronousQueryExecution: true, continuationToken: continuationToken, requestOptions: queryRequestOptions);
int linqExecutionItemCount = linqQueryable.Where(item => (item.taskNum < 100)).Count();
Assert.AreEqual(10 - firstItemSet, linqExecutionItemCount);
}
}
}
Loading

0 comments on commit bb72ba5

Please sign in to comment.