Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into users/jawilley/quer…
Browse files Browse the repository at this point in the history
…y/retryaftertest
  • Loading branch information
Jake Willey committed Mar 16, 2020
2 parents d21afbf + 1e6051d commit b60c643
Show file tree
Hide file tree
Showing 61 changed files with 8,728 additions and 6,901 deletions.
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientResources.resx
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,10 @@
<data name="FeedToken_CannotParse" xml:space="preserve">
<value>Cannot parse '{0}' as a valid FeedToken.</value>
</data>
<data name="FeedToken_EffectivePartitionKeyRouting" xml:space="preserve">
<value>Cannot define EffectivePartitionKeyRouting and FeedToken simultaneously.</value>
</data>
<data name="FeedToken_InvalidImplementation" xml:space="preserve">
<value>Expected FeedTokenInternal instance.</value>
</data>
</root>
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -864,13 +864,13 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper(
QueryRequestOptions requestOptions = null)
{
this.ThrowIfDisposed();
return new FeedIteratorCore(
this.ClientContext,
this.DatabaseRootUri,
ResourceType.Database,
queryDefinition,
continuationToken,
requestOptions);
return FeedIteratorCore.CreateForNonPartitionedResource(
clientContext: this.ClientContext,
resourceLink: this.DatabaseRootUri,
resourceType: ResourceType.Database,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);
}

/// <summary>
Expand Down
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ public static CosmosElement Parse(string json)

return cosmosElement;
}

public static TCosmosElement Parse<TCosmosElement>(string json)
where TCosmosElement : CosmosElement
{
if (!CosmosElement.TryParse(json, out TCosmosElement cosmosElement))
{
throw new ArgumentException($"Failed to parse json: {json}.");
}

return cosmosElement;
}
}
#if INTERNAL
#pragma warning restore SA1600 // Elements should be documented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.Cosmos.CosmosElements
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
Expand All @@ -21,7 +22,7 @@ private class LazyCosmosObject : CosmosObject
{
private readonly IJsonNavigator jsonNavigator;
private readonly IJsonNavigatorNode jsonNavigatorNode;
private readonly Dictionary<string, CosmosElement> cachedElements;
private readonly ConcurrentDictionary<string, CosmosElement> cachedElements;
private readonly Lazy<int> lazyCount;

public LazyCosmosObject(IJsonNavigator jsonNavigator, IJsonNavigatorNode jsonNavigatorNode)
Expand All @@ -44,7 +45,7 @@ public LazyCosmosObject(IJsonNavigator jsonNavigator, IJsonNavigatorNode jsonNav

this.jsonNavigator = jsonNavigator;
this.jsonNavigatorNode = jsonNavigatorNode;
this.cachedElements = new Dictionary<string, CosmosElement>();
this.cachedElements = new ConcurrentDictionary<string, CosmosElement>();
this.lazyCount = new Lazy<int>(() => this.jsonNavigator.GetObjectPropertyCount(this.jsonNavigatorNode));
}

Expand Down Expand Up @@ -92,31 +93,27 @@ public override IEnumerator<KeyValuePair<string, CosmosElement>> GetEnumerator()

public override bool TryGetValue(string key, out CosmosElement value)
{
value = default;
bool gotValue;
if (this.cachedElements.TryGetValue(
key,
out CosmosElement cosmosElemet))
{
value = cosmosElemet;
gotValue = true;
return true;
}
else if (this.jsonNavigator.TryGetObjectProperty(

if (this.jsonNavigator.TryGetObjectProperty(
this.jsonNavigatorNode,
key,
out ObjectProperty objectProperty))
{
value = CosmosElement.Dispatch(this.jsonNavigator, objectProperty.ValueNode);
gotValue = true;
this.cachedElements[key] = value;
}
else
{
value = null;
gotValue = false;

return true;
}

return gotValue;
value = default;
return false;
}

public override void WriteTo(IJsonWriter jsonWriter)
Expand Down
155 changes: 137 additions & 18 deletions Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Documents;
using static Microsoft.Azure.Documents.RuntimeConstants;

Expand All @@ -20,25 +22,75 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedIteratorCore : FeedIteratorInternal
{
private readonly ContainerCore containerCore;
private readonly CosmosClientContext clientContext;
private readonly Uri resourceLink;
private readonly ResourceType resourceType;
private readonly SqlQuerySpec querySpec;
private bool hasMoreResultsInternal;
private FeedTokenInternal feedTokenInternal;

public FeedIteratorCore(
internal static FeedIteratorCore CreateForNonPartitionedResource(
CosmosClientContext clientContext,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
QueryRequestOptions options)
{
return new FeedIteratorCore(
clientContext: clientContext,
containerCore: null,
resourceLink: resourceLink,
resourceType: resourceType,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
feedTokenInternal: null,
options: options);
}

internal static FeedIteratorCore CreateForPartitionedResource(
ContainerCore containerCore,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
FeedTokenInternal feedTokenInternal,
QueryRequestOptions options)
{
if (containerCore == null)
{
throw new ArgumentNullException(nameof(containerCore));
}

return new FeedIteratorCore(
containerCore: containerCore,
clientContext: containerCore.ClientContext,
resourceLink: resourceLink,
resourceType: resourceType,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
feedTokenInternal: feedTokenInternal,
options: options);
}

private FeedIteratorCore(
ContainerCore containerCore,
CosmosClientContext clientContext,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
FeedTokenInternal feedTokenInternal,
QueryRequestOptions options)
{
this.resourceLink = resourceLink;
this.containerCore = containerCore;
this.clientContext = clientContext;
this.resourceType = resourceType;
this.querySpec = queryDefinition?.ToSqlQuerySpec();
this.ContinuationToken = continuationToken;
this.feedTokenInternal = feedTokenInternal;
this.ContinuationToken = continuationToken ?? this.feedTokenInternal?.GetContinuation();
this.requestOptions = options;
this.hasMoreResultsInternal = true;
}
Expand All @@ -50,7 +102,7 @@ public override
#else
internal
#endif
FeedToken FeedToken => throw new NotImplementedException();
FeedToken FeedToken => this.feedTokenInternal;

/// <summary>
/// The query options for the result set
Expand All @@ -67,18 +119,48 @@ public override
/// </summary>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A query response from cosmos service</returns>
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.requestOptions);
using (diagnostics.CreateOverallScope("FeedReadNextAsync"))
{
return this.ReadNextInternalAsync(diagnostics, cancellationToken);
}
}

private async Task<ResponseMessage> ReadNextInternalAsync(
CosmosDiagnosticsContext diagnostics,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

Stream stream = null;
OperationType operation = OperationType.ReadFeed;
if (this.querySpec != null)
{
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
operation = OperationType.Query;
}

if (this.feedTokenInternal == null)
{
TryCatch<FeedTokenInternal> tryCatchFeedTokeninternal = await this.TryInitializeFeedTokenAsync(cancellationToken);
if (!tryCatchFeedTokeninternal.Succeeded)
{
if (tryCatchFeedTokeninternal.Exception.InnerException is CosmosException cosmosException)
{
return cosmosException.ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics));
}

return CosmosExceptionFactory.CreateInternalServerErrorException(
message: tryCatchFeedTokeninternal.Exception.InnerException.Message,
innerException: tryCatchFeedTokeninternal.Exception.InnerException,
diagnosticsContext: diagnostics).ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics));
}

this.feedTokenInternal = tryCatchFeedTokeninternal.Result;
}

ResponseMessage response = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.resourceLink,
resourceType: this.resourceType,
Expand All @@ -95,26 +177,63 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson);
request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
}
this.feedTokenInternal?.EnrichRequest(request);
},
diagnosticsScope: null,
diagnosticsScope: diagnostics,
cancellationToken: cancellationToken);

this.ContinuationToken = response.Headers.ContinuationToken;
this.hasMoreResultsInternal = GetHasMoreResults(this.ContinuationToken, response.StatusCode);
// Retry in case of splits or other scenarios only on partitioned resources
if (this.containerCore != null
&& await this.feedTokenInternal.ShouldRetryAsync(this.containerCore, response, cancellationToken))
{
return await this.ReadNextInternalAsync(diagnostics, cancellationToken);
}

if (response.IsSuccessStatusCode)
{
this.feedTokenInternal.UpdateContinuation(response.Headers.ContinuationToken);
this.ContinuationToken = this.feedTokenInternal.GetContinuation();
this.hasMoreResultsInternal = !this.feedTokenInternal.IsDone;
}
else
{
this.hasMoreResultsInternal = false;
}

return response;
}

internal static string GetContinuationToken(ResponseMessage httpResponseMessage)
private async Task<TryCatch<FeedTokenInternal>> TryInitializeFeedTokenAsync(CancellationToken cancellationToken)
{
return httpResponseMessage.Headers.ContinuationToken;
}
string containerRId = string.Empty;
if (this.containerCore != null)
{
try
{
containerRId = await this.containerCore.GetRIDAsync(cancellationToken);
}
catch (Exception cosmosException)
{
return TryCatch<FeedTokenInternal>.FromException(cosmosException);
}
}

internal static bool GetHasMoreResults(string continuationToken, HttpStatusCode statusCode)
{
// this logic might not be sufficient composite continuation token https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/269099
// in the case where this is a result set iterator for a change feed, not modified indicates that
// the enumeration is done for now.
return continuationToken != null && statusCode != HttpStatusCode.NotModified;
// Create FeedToken for the full Range
FeedTokenEPKRange feedTokenInternal = new FeedTokenEPKRange(
containerRId,
new PartitionKeyRange()
{
MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey
});
// Initialize with the ContinuationToken that the user passed, if any
if (this.ContinuationToken != null)
{
feedTokenInternal.UpdateContinuation(this.ContinuationToken);
}

return TryCatch<FeedTokenInternal>.FromResult(feedTokenInternal);
}

public override CosmosElement GetCosmsoElementContinuationToken()
Expand Down Expand Up @@ -164,4 +283,4 @@ public override async Task<FeedResponse<T>> ReadNextAsync(CancellationToken canc
return this.responseCreator(response);
}
}
}
}
Loading

0 comments on commit b60c643

Please sign in to comment.