Skip to content

Commit

Permalink
Merge to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake Willey committed Apr 23, 2021
2 parents ca3f227 + ecf90d6 commit f441ada
Show file tree
Hide file tree
Showing 37 changed files with 1,362 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,54 +69,58 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
ChangeFeedPage backendPage = crossFeedRangePage.Page;
if (backendPage is ChangeFeedNotModifiedPage)
{
using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info))
// Keep draining the cross partition enumerator until
// We get a non 304 page or we loop back to the same range or run into an exception
FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange;
// No point on draining when the state has 1 range
if (!IsNextRangeEqualToOriginal(this.crossPartitionEnumerator, originalRange))
{
// Keep draining the cross partition enumerator until
// We get a non 304 page or we loop back to the same range or run into an exception
FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange;
double totalRequestCharge = backendPage.RequestCharge;
do
using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info))
{
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
double totalRequestCharge = backendPage.RequestCharge;
do
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}

monadicCrossPartitionPage = this.crossPartitionEnumerator.Current;
if (monadicCrossPartitionPage.Failed)
{
// Buffer the exception, since we need to return the request charge so far.
this.bufferedException = TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>.FromException(monadicCrossPartitionPage.Exception);
}
else
{
crossFeedRangePage = monadicCrossPartitionPage.Result;
backendPage = crossFeedRangePage.Page;
totalRequestCharge += backendPage.RequestCharge;
}
}
while (!(backendPage is ChangeFeedSuccessPage
|| IsNextRangeEqualToOriginal(this.crossPartitionEnumerator, originalRange)
|| this.bufferedException.HasValue));

monadicCrossPartitionPage = this.crossPartitionEnumerator.Current;
if (monadicCrossPartitionPage.Failed)
// Create a page with the aggregated request charge
if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage)
{
// Buffer the exception, since we need to return the request charge so far.
this.bufferedException = TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>.FromException(monadicCrossPartitionPage.Exception);
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
crossFeedRangePage = monadicCrossPartitionPage.Result;
backendPage = crossFeedRangePage.Page;
totalRequestCharge += backendPage.RequestCharge;
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
}
}
while (!(backendPage is ChangeFeedSuccessPage
|| this.crossPartitionEnumerator.CurrentRange.Equals(originalRange)
|| this.bufferedException.HasValue));

// Create a page with the aggregated request charge
if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage)
{
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
}
}
}

Expand Down Expand Up @@ -160,6 +164,14 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create(
return enumerator;
}

private static bool IsNextRangeEqualToOriginal(
CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator,
FeedRangeInternal originalRange)
{
return crossPartitionEnumerator.TryPeekNext(out FeedRangeState<ChangeFeedState> nextState)
&& originalRange.Equals(nextState.FeedRange);
}

private static CreatePartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> MakeCreateFunction(
IChangeFeedDataSource changeFeedDataSource,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6347,7 +6347,8 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync(
return request;
}

AccountProperties databaseAccount = await gatewayModel.GetDatabaseAccountAsync(CreateRequestMessage);
AccountProperties databaseAccount = await gatewayModel.GetDatabaseAccountAsync(CreateRequestMessage,
clientSideRequestStatistics: null);

this.UseMultipleWriteLocations = this.ConnectionPolicy.UseMultipleWriteLocations && databaseAccount.EnableMultipleWriteLocations;
return databaseAccount;
Expand Down
9 changes: 9 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,15 @@ internal CosmosClientBuilder WithCpuMonitorDisabled()
return this;
}

/// <summary>
/// Enabled partition level failover in the SDK
/// </summary>
internal CosmosClientBuilder WithPartitionLevelFailoverEnabled()
{
this.clientOptions.EnablePartitionLevelFailover = true;
return this;
}

internal CosmosClientBuilder WithRetryWithOptions(
int? initialRetryForRetryWithMilliseconds,
int? maximumRetryForRetryWithMilliseconds,
Expand Down
40 changes: 31 additions & 9 deletions Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ namespace Microsoft.Azure.Cosmos
using System.Globalization;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;

Expand Down Expand Up @@ -41,17 +43,37 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync(
HttpConstants.HttpMethods.Get,
AuthorizationTokenType.PrimaryMasterKey);

using (HttpResponseMessage responseMessage = await this.httpClient.GetAsync(
uri: serviceEndpoint,
additionalHeaders: headers,
resourceType: ResourceType.DatabaseAccount,
timeoutPolicy: HttpTimeoutPolicyControlPlaneRead.Instance,
trace: NoOpTrace.Singleton,
cancellationToken: default))
IClientSideRequestStatistics stats = new ClientSideRequestStatisticsTraceDatum(DateTime.UtcNow);
try
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
using (HttpResponseMessage responseMessage = await this.httpClient.GetAsync(
uri: serviceEndpoint,
additionalHeaders: headers,
resourceType: ResourceType.DatabaseAccount,
timeoutPolicy: HttpTimeoutPolicyControlPlaneRead.Instance,
clientSideRequestStatistics: stats,
cancellationToken: default))
{
return CosmosResource.FromStream<AccountProperties>(documentServiceResponse);
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
{
return CosmosResource.FromStream<AccountProperties>(documentServiceResponse);
}
}
}
catch (OperationCanceledException ex)
{
// Catch Operation Cancelled Exception and convert to Timeout 408 if the user did not cancel it.
using (ITrace trace = Trace.GetRootTrace("Account Read Exception", TraceComponent.Transport, TraceLevel.Info))
{
trace.AddDatum("Client Side Request Stats", stats);
throw CosmosExceptionFactory.CreateRequestTimeoutException(
message: ex.Data?["Message"].ToString(),
headers: new Headers()
{
ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString()
},
innerException: ex,
trace: trace);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ internal Task<HttpResponseMessage> SendHttpAsync(
Func<ValueTask<HttpRequestMessage>> requestMessage,
ResourceType resourceType,
HttpTimeoutPolicy timeoutPolicy,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken = default)
{
return this.httpClient.SendHttpAsync(
createRequestMessageAsync: requestMessage,
resourceType: resourceType,
timeoutPolicy: timeoutPolicy,
trace: NoOpTrace.Singleton,
clientSideRequestStatistics: clientSideRequestStatistics,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -332,7 +333,7 @@ private Task<HttpResponseMessage> InvokeClientAsync(
() => this.PrepareRequestMessageAsync(request, physicalAddress),
resourceType,
HttpTimeoutPolicy.GetTimeoutPolicy(request),
NoOpTrace.Singleton,
request.RequestContext.ClientRequestStatistics,
cancellationToken);
}
}
Expand Down
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ await GatewayStoreModel.ApplySessionTokenAsync(
return response;
}

public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueTask<HttpRequestMessage>> requestMessage, CancellationToken cancellationToken = default)
public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueTask<HttpRequestMessage>> requestMessage,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken = default)
{
AccountProperties databaseAccount = null;

Expand All @@ -94,6 +96,7 @@ public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueT
requestMessage,
ResourceType.DatabaseAccount,
HttpTimeoutPolicyControlPlaneRead.Instance,
clientSideRequestStatistics,
cancellationToken))
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
Expand Down
20 changes: 11 additions & 9 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,19 @@ public virtual async Task<ResponseMessage> SendAsync(
ContainerProperties collectionFromCache;
try
{
ClientCollectionCache collectionCache = await this.client.DocumentClient.GetCollectionCacheAsync(childTrace);
collectionFromCache = await collectionCache.ResolveByNameAsync(
HttpConstants.Versions.CurrentVersion,
cosmosContainerCore.LinkUri,
forceRefesh: false,
cancellationToken,
childTrace);
if (cosmosContainerCore == null)
{
throw new ArgumentException($"The container core can not be null for FeedRangeEpk");
}

collectionFromCache = await cosmosContainerCore.GetCachedContainerPropertiesAsync(
forceRefresh: false,
childTrace,
cancellationToken);
}
catch (DocumentClientException ex)
catch (CosmosException ex)
{
return CosmosExceptionFactory.Create(ex, childTrace).ToCosmosResponseMessage(request);
return ex.ToCosmosResponseMessage(request);
}

PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace);
Expand Down
21 changes: 21 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/TransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -44,6 +45,26 @@ public override async Task<ResponseMessage> SendAsync(
Debug.Assert(System.Diagnostics.Trace.CorrelationManager.ActivityId != Guid.Empty, "Trace activity id is missing");
return ce.ToCosmosResponseMessage(request);
}
catch (OperationCanceledException ex)
{
// Catch Operation Cancelled Exception and convert to Timeout 408 if the user did not cancel it.
// Throw the exception if the user cancelled.
if (cancellationToken.IsCancellationRequested)
{
throw;
}

Debug.Assert(System.Diagnostics.Trace.CorrelationManager.ActivityId != Guid.Empty, "Trace activity id is missing");
CosmosException cosmosException = CosmosExceptionFactory.CreateRequestTimeoutException(
message: ex.Data?["Message"].ToString(),
headers: new Headers()
{
ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString()
},
innerException: ex,
trace: request.Trace);
return cosmosException.ToCosmosResponseMessage(request);
}
catch (AggregateException ex)
{
Debug.Assert(System.Diagnostics.Trace.CorrelationManager.ActivityId != Guid.Empty, "Trace activity id is missing");
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public abstract Task<HttpResponseMessage> GetAsync(
INameValueCollection additionalHeaders,
ResourceType resourceType,
HttpTimeoutPolicy timeoutPolicy,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken);

public abstract Task<HttpResponseMessage> SendHttpAsync(
Func<ValueTask<HttpRequestMessage>> createRequestMessageAsync,
ResourceType resourceType,
HttpTimeoutPolicy timeoutPolicy,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken);

protected abstract void Dispose(bool disposing);
Expand Down
Loading

0 comments on commit f441ada

Please sign in to comment.