Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosQueryExecutionContextFactory Refactory #988

Merged
merged 37 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c5bbc0e
drafted out refactor
bchong95 Nov 12, 2019
9cc75f9
removed null check
bchong95 Nov 12, 2019
b14bc0f
resolved some iteration comments
bchong95 Nov 12, 2019
e2d3563
reproduced failure
bchong95 Nov 13, 2019
39b50af
got exceptionless to work for non order by queries
bchong95 Nov 13, 2019
72b1198
got exceptionless working for order by
bchong95 Nov 13, 2019
69e0df2
got parallel basic working
bchong95 Nov 14, 2019
f5b8ab5
got non failure case working again
bchong95 Nov 14, 2019
bdbf605
got continuation token story working
bchong95 Nov 14, 2019
c1c5b2a
fixed miscellaneous tests
bchong95 Nov 15, 2019
dac7d8d
got all the combinations working for exceptionless
bchong95 Nov 15, 2019
f9d1715
added test for empty pages
bchong95 Nov 15, 2019
78130a2
uncommented
bchong95 Nov 15, 2019
ae8523f
wip
bchong95 Nov 15, 2019
25eefc4
resolved iteration comments
bchong95 Nov 15, 2019
58d4540
Merge branch 'master' into users/brchon/ImmutableQueryFactory
bchong95 Nov 15, 2019
17aed1e
fixed changelog
bchong95 Nov 15, 2019
fe870db
build
bchong95 Nov 15, 2019
4cc5159
build
bchong95 Nov 15, 2019
8e4dcf2
resolved iteration comments
bchong95 Nov 16, 2019
f422555
update test
bchong95 Nov 16, 2019
1fa6ae1
automatically advancing page
bchong95 Nov 16, 2019
2ec83e2
fixed some split failures
bchong95 Nov 16, 2019
ede9232
merged
bchong95 Nov 16, 2019
f346aaa
merged
bchong95 Nov 16, 2019
f3fce62
fixed split bugs
bchong95 Nov 17, 2019
816d3de
fixed continuation token exception
bchong95 Nov 17, 2019
d3e9280
merged
bchong95 Nov 17, 2019
3aace4a
added a catch all query excetuion context
bchong95 Nov 17, 2019
e2ebfc9
ammend
bchong95 Nov 17, 2019
65130fd
removed is done check for try get continuation token
bchong95 Nov 17, 2019
3f293e1
updated baseline
bchong95 Nov 17, 2019
511b991
fixed test since code doesn't just throw exceptions anymore
bchong95 Nov 17, 2019
d3771ec
removed stack trace from exception
bchong95 Nov 18, 2019
d07126b
updated error message one more time?
bchong95 Nov 18, 2019
4c73633
updated baselines
bchong95 Nov 18, 2019
be49066
merged
bchong95 Nov 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public static TryCatch<IAggregator> TryCreate(string continuationToken)
{
if (!AverageInfo.TryParse(continuationToken, out averageInfo))
{
return TryCatch<IAggregator>.FromException(new ArgumentException($"Invalid continuation token: {continuationToken}"));
return TryCatch<IAggregator>.FromException(
new ArgumentException($"Invalid continuation token: {continuationToken}"));
}
}
else
Expand Down
53 changes: 53 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/Core/AsyncLazy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core
{
using System;
using System.Threading;
using System.Threading.Tasks;

internal sealed class AsyncLazy<T>
{
private readonly Func<CancellationToken, Task<T>> valueFactory;
private T value;

public AsyncLazy(Func<CancellationToken, Task<T>> valueFactory)
{
if (valueFactory == null)
{
throw new ArgumentNullException(nameof(valueFactory));
}

this.valueFactory = valueFactory;
}

public bool ValueInitialized { get; private set; }

public async Task<T> GetValueAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (!this.ValueInitialized)
{
this.value = await this.valueFactory(cancellationToken);
this.ValueInitialized = true;
}

return this.value;
}

public T Result
{
get
{
if (!this.ValueInitialized)
{
throw new InvalidOperationException("Can not retrieve value before initialization.");
}

return this.value;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public abstract bool IsDone
/// <summary>
/// Executes the context to feed the next page of results.
/// </summary>
/// <param name="token">The cancellation token.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task to await on, which in return provides a DoucmentFeedResponse of documents.</returns>
public abstract Task<QueryResponseCore> ExecuteNextAsync(CancellationToken token);
public abstract Task<QueryResponseCore> ExecuteNextAsync(CancellationToken cancellationToken);

public abstract bool TryGetContinuationToken(out string state);
public abstract bool TryGetContinuationToken(out string continuationToken);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
{
using System;
using System.Threading;
using System.Threading.Tasks;

internal sealed class CosmosQueryExecutionContextWithNameCacheStaleRetry : CosmosQueryExecutionContext
{
private readonly CosmosQueryContext cosmosQueryContext;
private readonly Func<CosmosQueryExecutionContext> cosmosQueryExecutionContextFactory;
private CosmosQueryExecutionContext currentCosmosQueryExecutionContext;
private bool alreadyRetried;

public CosmosQueryExecutionContextWithNameCacheStaleRetry(
CosmosQueryContext cosmosQueryContext,
Func<CosmosQueryExecutionContext> cosmosQueryExecutionContextFactory)
{
this.cosmosQueryContext = cosmosQueryContext;
this.cosmosQueryExecutionContextFactory = cosmosQueryExecutionContextFactory;
this.currentCosmosQueryExecutionContext = cosmosQueryExecutionContextFactory();
this.alreadyRetried = false;
}

public override bool IsDone => this.currentCosmosQueryExecutionContext.IsDone;

public override void Dispose()
{
this.currentCosmosQueryExecutionContext.Dispose();
}

public override async Task<QueryResponseCore> ExecuteNextAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

// If the cache is stale the entire execute context has incorrect values and should be recreated.
// This should only be done for the first execution.
// If results have already been pulled,
// then an error should be returned to the user,
// since it's not possible to combine query results from multiple containers.
QueryResponseCore queryResponse = await this.currentCosmosQueryExecutionContext.ExecuteNextAsync(cancellationToken);
if (
(queryResponse.StatusCode == System.Net.HttpStatusCode.Gone) &&
(queryResponse.SubStatusCode == Documents.SubStatusCodes.NameCacheIsStale) &&
!this.alreadyRetried)
{
this.currentCosmosQueryExecutionContext.Dispose();
await this.cosmosQueryContext.QueryClient.ForceRefreshCollectionCacheAsync(
this.cosmosQueryContext.ResourceLink.OriginalString,
cancellationToken);
this.currentCosmosQueryExecutionContext = this.cosmosQueryExecutionContextFactory();
this.alreadyRetried = true;
return await this.ExecuteNextAsync(cancellationToken);
}

return queryResponse;
}

public override bool TryGetContinuationToken(out string continuationToken)
{
return this.currentCosmosQueryExecutionContext.TryGetContinuationToken(out continuationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query.Core.Monads;

/// <summary>
/// Implementation of <see cref="CosmosQueryExecutionContext"/> that composes another context and defers it's initialization until the first read.
/// </summary>
internal sealed class LazyCosmosQueryExecutionContext : CosmosQueryExecutionContext
{
private readonly AsyncLazy<TryCatch<CosmosQueryExecutionContext>> lazyTryCreateCosmosQueryExecutionContext;

public LazyCosmosQueryExecutionContext(AsyncLazy<TryCatch<CosmosQueryExecutionContext>> lazyTryCreateCosmosQueryExecutionContext)
{
if (lazyTryCreateCosmosQueryExecutionContext == null)
{
throw new ArgumentNullException(nameof(lazyTryCreateCosmosQueryExecutionContext));
}

this.lazyTryCreateCosmosQueryExecutionContext = lazyTryCreateCosmosQueryExecutionContext;
}

public override bool IsDone
{
get
{
bool isDone;
if (this.lazyTryCreateCosmosQueryExecutionContext.ValueInitialized)
{
TryCatch<CosmosQueryExecutionContext> tryCreateCosmosQueryExecutionContext = this.lazyTryCreateCosmosQueryExecutionContext.Result;
if (tryCreateCosmosQueryExecutionContext.Succeeded)
{
isDone = tryCreateCosmosQueryExecutionContext.Result.IsDone;
}
else
{
isDone = true;
}
}
else
{
isDone = false;
}

return isDone;
}
}

public override void Dispose()
{
if (this.lazyTryCreateCosmosQueryExecutionContext.ValueInitialized)
{
TryCatch<CosmosQueryExecutionContext> tryCreateCosmosQueryExecutionContext = this.lazyTryCreateCosmosQueryExecutionContext.Result;
if (tryCreateCosmosQueryExecutionContext.Succeeded)
{
tryCreateCosmosQueryExecutionContext.Result.Dispose();
}
}
}

public override async Task<QueryResponseCore> ExecuteNextAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

TryCatch<CosmosQueryExecutionContext> tryCreateCosmosQueryExecutionContext = await this.lazyTryCreateCosmosQueryExecutionContext.GetValueAsync(cancellationToken);
if (!tryCreateCosmosQueryExecutionContext.Succeeded)
{
return QueryResponseFactory.CreateFromException(tryCreateCosmosQueryExecutionContext.Exception);
}

CosmosQueryExecutionContext cosmosQueryExecutionContext = tryCreateCosmosQueryExecutionContext.Result;
QueryResponseCore queryResponseCore = await cosmosQueryExecutionContext.ExecuteNextAsync(cancellationToken);
return queryResponseCore;
}

public override bool TryGetContinuationToken(out string state)
{
if (!this.lazyTryCreateCosmosQueryExecutionContext.ValueInitialized)
{
state = null;
return false;
}

TryCatch<CosmosQueryExecutionContext> tryCreateCosmosQueryExecutionContext = this.lazyTryCreateCosmosQueryExecutionContext.Result;
if (!tryCreateCosmosQueryExecutionContext.Succeeded)
{
state = null;
return false;
}

return tryCreateCosmosQueryExecutionContext.Result.TryGetContinuationToken(out state);
}
}
}
18 changes: 2 additions & 16 deletions Microsoft.Azure.Cosmos/src/Query/Core/Monads/TryCatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,7 @@ public TryCatch<T> Try<T>(
TryCatch<T> matchResult;
if (this.Succeeded)
{
try
{
matchResult = TryCatch<T>.FromResult(onSuccess(this.either.FromRight(default)));
}
catch (Exception ex)
{
matchResult = TryCatch<T>.FromException(ex);
}
matchResult = TryCatch<T>.FromResult(onSuccess(this.either.FromRight(default)));
}
else
{
Expand All @@ -101,14 +94,7 @@ public async Task<TryCatch<T>> TryAsync<T>(
TryCatch<T> matchResult;
if (this.Succeeded)
{
try
{
matchResult = TryCatch<T>.FromResult(await onSuccess(this.either.FromRight(default)));
}
catch (Exception ex)
{
matchResult = TryCatch<T>.FromException(ex);
}
matchResult = TryCatch<T>.FromResult(await onSuccess(this.either.FromRight(default)));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Query
internal struct QueryResponseCore
{
private static readonly IReadOnlyList<CosmosElement> EmptyList = new List<CosmosElement>().AsReadOnly();
internal static readonly string EmptyGuidString = Guid.Empty.ToString();
internal static readonly IReadOnlyCollection<QueryPageDiagnostics> EmptyDiagnostics = new List<QueryPageDiagnostics>();

private QueryResponseCore(
Expand Down
65 changes: 65 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/Core/QueryResponseFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core
{
using System;

internal static class QueryResponseFactory
{
private static readonly string EmptyGuidString = Guid.Empty.ToString();

public static QueryResponseCore CreateFromException(Exception exception)
{
QueryResponseCore queryResponseCore;
if (exception is CosmosException cosmosException)
{
queryResponseCore = CreateFromCosmosException(cosmosException);
}
else if (exception is Microsoft.Azure.Documents.DocumentClientException documentClientException)
{
queryResponseCore = CreateFromDocumentClientException(documentClientException);
}
else
{
// Unknown exception type should become a 500
queryResponseCore = QueryResponseCore.CreateFailure(
statusCode: System.Net.HttpStatusCode.InternalServerError,
subStatusCodes: null,
errorMessage: exception.ToString(),
requestCharge: 0,
activityId: QueryResponseCore.EmptyGuidString,
diagnostics: QueryResponseCore.EmptyDiagnostics);
}

return queryResponseCore;
}

private static QueryResponseCore CreateFromCosmosException(CosmosException cosmosException)
{
QueryResponseCore queryResponseCore = QueryResponseCore.CreateFailure(
statusCode: cosmosException.StatusCode,
subStatusCodes: (Microsoft.Azure.Documents.SubStatusCodes)cosmosException.SubStatusCode,
errorMessage: cosmosException.Message,
requestCharge: 0,
activityId: cosmosException.ActivityId,
diagnostics: QueryResponseCore.EmptyDiagnostics);

return queryResponseCore;
}

private static QueryResponseCore CreateFromDocumentClientException(Microsoft.Azure.Documents.DocumentClientException documentClientException)
{
QueryResponseCore queryResponseCore = QueryResponseCore.CreateFailure(
statusCode: documentClientException.StatusCode.GetValueOrDefault(System.Net.HttpStatusCode.InternalServerError),
subStatusCodes: null,
errorMessage: documentClientException.Message,
requestCharge: 0,
activityId: documentClientException.ActivityId,
diagnostics: QueryResponseCore.EmptyDiagnostics);

return queryResponseCore;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace Microsoft.Azure.Cosmos.Query
/// </summary>
internal sealed class DefaultDocumentQueryExecutionContext : DocumentQueryExecutionContextBase
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";

/// <summary>
/// Whether or not a continuation is expected.
/// </summary>
Expand Down Expand Up @@ -280,8 +282,9 @@ private static bool ServiceInteropAvailable()
{
FeedOptions feedOptions = this.GetFeedOptions(null);
PartitionKeyDefinition partitionKeyDefinition;
object partitionKeyDefinitionObject;
if (feedOptions.Properties != null && feedOptions.Properties.TryGetValue(CosmosQueryExecutionContextFactory.InternalPartitionKeyDefinitionProperty, out partitionKeyDefinitionObject))
if ((feedOptions.Properties != null) && feedOptions.Properties.TryGetValue(
DefaultDocumentQueryExecutionContext.InternalPartitionKeyDefinitionProperty,
out object partitionKeyDefinitionObject))
{
if (partitionKeyDefinitionObject is PartitionKeyDefinition definition)
{
Expand All @@ -299,7 +302,6 @@ private static bool ServiceInteropAvailable()
partitionKeyDefinition = collection.PartitionKey;
}

QueryInfo queryInfo;
providedRanges = PartitionRoutingHelper.GetProvidedPartitionKeyRanges(
(errorMessage) => new BadRequestException(errorMessage),
this.QuerySpec,
Expand All @@ -310,7 +312,7 @@ private static bool ServiceInteropAvailable()
partitionKeyDefinition,
queryPartitionProvider,
version,
out queryInfo);
out QueryInfo queryInfo);
}
else if (request.Properties != null && request.Properties.TryGetValue(
WFConstants.BackendHeaders.EffectivePartitionKeyString,
Expand Down Expand Up @@ -367,7 +369,7 @@ private async Task<DocumentServiceRequest> CreateRequestAsync()
INameValueCollection requestHeaders = await this.CreateCommonHeadersAsync(
this.GetFeedOptions(this.ContinuationToken));

requestHeaders[HttpConstants.HttpHeaders.IsContinuationExpected] = isContinuationExpected.ToString();
requestHeaders[HttpConstants.HttpHeaders.IsContinuationExpected] = this.isContinuationExpected.ToString();

return this.CreateDocumentServiceRequest(
requestHeaders,
Expand Down
Loading