Skip to content

Commit

Permalink
Merge branch 'master' into users/brchon/Query/PerserveOrderingOfGroup…
Browse files Browse the repository at this point in the history
…ByProjections
  • Loading branch information
bchong95 committed Oct 26, 2019
2 parents 4aaec62 + 0eb4193 commit 41c836b
Show file tree
Hide file tree
Showing 23 changed files with 195 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ClientVersion>3.3.2</ClientVersion>
<DirectVersion>3.4.0</DirectVersion>
<DirectVersion>3.4.1</DirectVersion>
<HybridRowVersion>1.0.0-preview</HybridRowVersion>
<AboveDirBuildProps>$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))</AboveDirBuildProps>
</PropertyGroup>
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ public CosmosClient(
enableCpuMonitor: clientOptionsClone.EnableCpuMonitor,
storeClientFactory: clientOptionsClone.StoreClientFactory,
desiredConsistencyLevel: clientOptionsClone.GetDocumentsConsistencyLevel(),
handler: this.CreateHttpClientHandler(clientOptions));
handler: this.CreateHttpClientHandler(clientOptions),
sessionContainer: clientOptionsClone.SessionContainer);

this.Init(
clientOptionsClone,
Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Data.Common;
using System.Linq;
using System.Net;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
Expand Down Expand Up @@ -84,6 +85,11 @@ public CosmosClientOptions()
/// </remarks>
public string ApplicationName { get; set; }

/// <summary>
/// Get or set session container for the client
/// </summary>
internal ISessionContainer SessionContainer { get; set; }

/// <summary>
/// Get or set the preferred geo-replicated region to be used for Azure Cosmos DB service interaction.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public override async Task<QueryResponseCore> DrainAsync(int maxElements, Cancel
responseLengthBytes: responseLengthBytes);
}

public override bool TryGetContinuationToken(out string state)
{
state = null;
return true;
}

/// <summary>
/// Struct for getting the payload out of the rewritten projection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public override async Task<QueryResponseCore> DrainAsync(int maxElements, Cancel
responseLengthBytes: cosmosQueryResponse.ResponseLengthBytes);
}

public override bool TryGetContinuationToken(out string state)
{
state = null;
return false;
}

/// <summary>
/// Efficiently casts a object to a JToken.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ public void Stop()
{
this.Source.Stop();
}

public abstract bool TryGetContinuationToken(out string state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ public override async Task<QueryResponseCore> DrainAsync(
return response;
}

public override bool TryGetContinuationToken(out string state)
{
state = default(string);
return false;
}

/// <summary>
/// When a group by query gets rewritten the projection looks like:
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ internal interface IDocumentQueryExecutionComponent : IDisposable
/// Stops this document query execution component.
/// </summary>
void Stop();

bool TryGetContinuationToken(out string state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ public override async Task<QueryResponseCore> DrainAsync(int maxElements, Cancel

if (sourcePage.DisallowContinuationTokenMessage == null)
{
if (!this.IsDone)
if (!this.TryGetContinuationToken(out updatedContinuationToken))
{
updatedContinuationToken = new OffsetContinuationToken(
this.skipCount,
sourcePage.ContinuationToken).ToString();
throw new InvalidOperationException($"Failed to get state for {nameof(SkipDocumentQueryExecutionComponent)}.");
}
}

Expand All @@ -93,6 +91,30 @@ public override async Task<QueryResponseCore> DrainAsync(int maxElements, Cancel
responseLengthBytes: sourcePage.ResponseLengthBytes);
}

public override bool TryGetContinuationToken(out string state)
{
if (!this.IsDone)
{
if (this.Source.TryGetContinuationToken(out string sourceState))
{
state = new OffsetContinuationToken(
this.skipCount,
sourceState).ToString();
return true;
}
else
{
state = null;
return false;
}
}
else
{
state = null;
return true;
}
}

/// <summary>
/// A OffsetContinuationToken is a composition of a source continuation token and how many items to skip from that source.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,46 +118,66 @@ public override async Task<QueryResponseCore> DrainAsync(int maxElements, Cancel
}

List<CosmosElement> takedDocuments = results.CosmosElements.Take(this.takeCount).ToList();

this.takeCount -= takedDocuments.Count;
string updatedContinuationToken = null;

string updatedContinuationToken = null;
if (results.DisallowContinuationTokenMessage == null)
{
if (!this.IsDone)
if (!this.TryGetContinuationToken(out updatedContinuationToken))
{
throw new InvalidOperationException($"Failed to get state for {nameof(TakeDocumentQueryExecutionComponent)}.");
}
}

return QueryResponseCore.CreateSuccess(
result: takedDocuments,
continuationToken: updatedContinuationToken,
disallowContinuationTokenMessage: results.DisallowContinuationTokenMessage,
activityId: results.ActivityId,
requestCharge: results.RequestCharge,
diagnostics: results.diagnostics,
responseLengthBytes: results.ResponseLengthBytes);
}

public override bool TryGetContinuationToken(out string state)
{
if (!this.IsDone)
{
if (this.Source.TryGetContinuationToken(out string sourceState))
{
string sourceContinuation = results.ContinuationToken;
TakeContinuationToken takeContinuationToken;
switch (this.takeEnum)
{
case TakeEnum.Limit:
takeContinuationToken = new LimitContinuationToken(
this.takeCount,
sourceContinuation);
sourceState);
break;

case TakeEnum.Top:
takeContinuationToken = new TopContinuationToken(
this.takeCount,
sourceContinuation);
sourceState);
break;

default:
throw new ArgumentException($"Unknown {nameof(TakeEnum)}: {this.takeEnum}");
}

updatedContinuationToken = takeContinuationToken.ToString();
state = takeContinuationToken.ToString();
return true;
}
else
{
state = default(string);
return false;
}
}

return QueryResponseCore.CreateSuccess(
result: takedDocuments,
continuationToken: updatedContinuationToken,
disallowContinuationTokenMessage: results.DisallowContinuationTokenMessage,
activityId: results.ActivityId,
requestCharge: results.RequestCharge,
diagnostics: results.diagnostics,
responseLengthBytes: results.ResponseLengthBytes);
else
{
state = default(string);
return true;
}
}

private enum TakeEnum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,26 +676,10 @@ private void OnItemProducerTreeCompleteFetching(
}
}

/// <summary>
/// Gets the formatting for a trace.
/// </summary>
/// <param name="message">The message to format</param>
/// <returns>The formatted message ready for a trace.</returns>
private string GetTrace(string message)
{
const string TracePrefixFormat = "{0}, CorrelatedActivityId: {1}, ActivityId: {2} | {3}";
return string.Format(
CultureInfo.InvariantCulture,
TracePrefixFormat,
DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture),
this.queryContext.CorrelatedActivityId,
this.itemProducerForest.Count != 0 ? this.CurrentItemProducerTree().ActivityId : Guid.Empty,
message);
}

private static bool IsMaxBufferedItemCountSet(int maxBufferedItemCount)
public bool TryGetContinuationToken(out string state)
{
return maxBufferedItemCount != default(int);
state = this.ContinuationToken;
return true;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ public abstract bool IsDone
/// <param name="token">The cancellation token.</param>
/// <returns>A task to await on, which in return provides a DoucmentFeedResponse of documents.</returns>
public abstract Task<QueryResponseCore> ExecuteNextAsync(CancellationToken token);

public abstract bool TryGetContinuationToken(out string state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ await this.CosmosQueryContext.QueryClient.ForceRefreshCollectionCacheAsync(
}
}

public override bool TryGetContinuationToken(out string state)
{
return this.innerExecutionContext.TryGetContinuationToken(out state);
}

private async Task<CosmosQueryExecutionContext> CreateItemQueryExecutionContextAsync(
CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public override bool IsDone
}
}

public override bool TryGetContinuationToken(out string state)
{
return this.component.TryGetContinuationToken(out state);
}

/// <summary>
/// Creates a CosmosPipelinedItemQueryExecutionContext.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/FeedIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,14 @@ public abstract class FeedIterator
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A query response from cosmos service</returns>
public abstract Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Tries to get the continuation token for the feed iterator.
/// Useful to avoid exceptions.
/// Useful to avoid the cost serialization until needed.
/// </summary>
/// <param name="continuationToken">The continuation to resume from.</param>
/// <returns>Whether or not we can get the continuaiton token.</returns>
internal abstract bool TryGetContinuationToken(out string continuationToken);
}
}
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/FeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ internal FeedIteratorCore(
return response;
}

internal override bool TryGetContinuationToken(out string state)
{
state = this.continuationToken;
return true;
}

internal static string GetContinuationToken(ResponseMessage httpResponseMessage)
{
return httpResponseMessage.Headers.ContinuationToken;
Expand Down Expand Up @@ -145,5 +151,10 @@ internal FeedIteratorCore(
ResponseMessage response = await this.feedIterator.ReadNextAsync(cancellationToken);
return this.responseCreator(response);
}

internal override bool TryGetContinuationToken(out string state)
{
return this.feedIterator.TryGetContinuationToken(out state);
}
}
}
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/FeedIteratorOfT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ public abstract class FeedIterator<T>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A query response from cosmos service</returns>
public abstract Task<FeedResponse<T>> ReadNextAsync(CancellationToken cancellationToken = default(CancellationToken));

internal abstract bool TryGetContinuationToken(out string state);
}
}
5 changes: 5 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,10 @@ public static QueryIterator Create(

return response;
}

internal override bool TryGetContinuationToken(out string state)
{
return this.cosmosQueryExecutionContext.TryGetContinuationToken(out state);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private Task<ResponseMessage> ProcessStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return ProcessResourceOperationStreamAsync(
return this.ProcessResourceOperationStreamAsync(
streamPayload: streamPayload,
operationType: operationType,
linkUri: this.LinkUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ internal ChangeFeedPartitionKeyResultSetIteratorCore(
}, cancellationToken);
}

internal override bool TryGetContinuationToken(out string state)
{
state = this.continuationToken;
return true;
}

private Task<ResponseMessage> NextResultSetDelegateAsync(
string continuationToken,
string partitionKeyRangeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ internal ChangeFeedResultSetIteratorCore(
return response;
}

internal override bool TryGetContinuationToken(out string state)
{
state = this.continuationToken;
return true;
}

internal async Task<Tuple<string, ResponseMessage>> ReadNextInternalAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down
Loading

0 comments on commit 41c836b

Please sign in to comment.