Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Fixes TaskCanceledException being converted to InternalServerError and losing diagnostics #2424

Merged
merged 9 commits into from
Apr 23, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Data;
using Microsoft.Azure.Cosmos.Tracing;

internal abstract class ChangeFeedException : Exception
{
Expand All @@ -24,6 +25,6 @@ protected ChangeFeedException(string message, Exception innerException)
{
}

public abstract TResult Accept<TResult>(ChangeFeedExceptionVisitor<TResult> visitor);
public abstract TResult Accept<TResult>(ChangeFeedExceptionVisitor<TResult> visitor, ITrace trace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using Microsoft.Azure.Cosmos.Tracing;

internal abstract class ChangeFeedExceptionVisitor<TResult>
{
internal abstract TResult Visit(MalformedChangeFeedContinuationTokenException malformedChangeFeedContinuationTokenException);
internal abstract TResult Visit(MalformedChangeFeedContinuationTokenException malformedChangeFeedContinuationTokenException, ITrace trace);
}
}
36 changes: 30 additions & 6 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,26 +201,43 @@ public ChangeFeedIteratorCore(
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
requestOptions: this.changeFeedRequestOptions,
j82w marked this conversation as resolved.
Show resolved Hide resolved
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}

public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, CancellationToken cancellationToken = default)
{
try
{
return await this.ReadNextInternalAsync(trace, cancellationToken);
}
catch (OperationCanceledException ex) when (!(ex is CosmosOperationCanceledException))
{
throw new CosmosOperationCanceledException(ex, trace);
}
}

private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, CancellationToken cancellationToken = default)
{
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

cancellationToken.ThrowIfCancellationRequested();

TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = await this.lazyMonadicEnumerator.GetValueAsync(trace, cancellationToken);
if (monadicEnumerator.Failed)
{
Exception createException = monadicEnumerator.Exception;
CosmosException cosmosException = ExceptionToCosmosException.CreateFromException(createException);
if (!ExceptionToCosmosException.TryCreateFromException(
createException,
trace,
out CosmosException cosmosException))
{
throw createException;
}

return new ResponseMessage(
cosmosException.StatusCode,
requestMessage: null,
Expand All @@ -237,7 +254,14 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella

if (enumerator.Current.Failed)
{
CosmosException cosmosException = ExceptionToCosmosException.CreateFromException(enumerator.Current.Exception);
if (!ExceptionToCosmosException.TryCreateFromException(
enumerator.Current.Exception,
trace,
out CosmosException cosmosException))
{
throw enumerator.Current.Exception;
}

if (!IsRetriableException(cosmosException))
{
this.hasMoreResults = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class MalformedChangeFeedContinuationTokenException : ChangeFeedException
{
Expand All @@ -23,9 +24,9 @@ public MalformedChangeFeedContinuationTokenException(string message, Exception i
{
}

public override TResult Accept<TResult>(ChangeFeedExceptionVisitor<TResult> visitor)
public override TResult Accept<TResult>(ChangeFeedExceptionVisitor<TResult> visitor, ITrace trace)
{
return visitor.Visit(this);
return visitor.Visit(this, trace);
}
}
}
23 changes: 18 additions & 5 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -189,13 +190,25 @@ public virtual async Task<ResponseMessage> SendAsync(
}
else if (feedRange is FeedRangeEpk feedRangeEpk)
{
DocumentServiceRequest serviceRequest = request.ToDocumentServiceRequest();
ContainerProperties collectionFromCache;
j82w marked this conversation as resolved.
Show resolved Hide resolved
try
{
if (cosmosContainerCore == null)
{
throw new ArgumentException($"The container core can not be null for FeedRangeEpk");
}

PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace);
CollectionCache collectionCache = await this.client.DocumentClient.GetCollectionCacheAsync(childTrace);
ContainerProperties collectionFromCache =
await collectionCache.ResolveCollectionAsync(serviceRequest, cancellationToken, childTrace);
collectionFromCache = await cosmosContainerCore.GetCachedContainerPropertiesAsync(
forceRefresh: false,
childTrace,
cancellationToken);
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request);
}

PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace);
IReadOnlyList<PartitionKeyRange> overlappingRanges = await routingMapProvider.TryGetOverlappingRangesAsync(
collectionFromCache.ResourceId,
feedRangeEpk.Range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerat

protected override async Task<TryCatch<TPage>> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
Expand All @@ -44,8 +42,6 @@ protected override async Task<TryCatch<TPage>> GetNextPageAsync(ITrace trace, Ca

public async ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
Expand All @@ -62,5 +58,11 @@ public async ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellatio
this.bufferedPage = this.enumerator.Current;
}
}

public override void SetCancellationToken(CancellationToken cancellationToken)
{
base.SetCancellationToken(cancellationToken);
this.enumerator.SetCancellationToken(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
throw new ArgumentNullException(nameof(trace));
}

this.cancellationToken.ThrowIfCancellationRequested();

using (ITrace childTrace = trace.StartChild(name: nameof(MoveNextAsync), component: TraceComponent.Pagination, level: TraceLevel.Info))
{
IQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = await this.lazyEnumerators.GetValueAsync(
Expand All @@ -126,6 +124,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
}

PartitionRangePageAsyncEnumerator<TPage, TState> currentPaginator = enumerators.Dequeue();
currentPaginator.SetCancellationToken(this.cancellationToken);
if (!await currentPaginator.MoveNextAsync(childTrace))
{
// Current enumerator is empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
ITrace trace,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

readFeedPaginationOptions ??= ReadFeedPaginationOptions.Default;

ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
Expand Down Expand Up @@ -239,8 +237,6 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
ITrace trace,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (sqlQuerySpec == null)
{
throw new ArgumentNullException(nameof(sqlQuerySpec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)

public abstract ValueTask DisposeAsync();

public void SetCancellationToken(CancellationToken cancellationToken)
public virtual void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}
Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Query/Core/AsyncLazy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public async Task<T> GetValueAsync(ITrace trace, CancellationToken cancellationT
{
// Note that this class is not thread safe.
// if the valueFactory has side effects than this will have issues.
cancellationToken.ThrowIfCancellationRequested();
if (!this.ValueInitialized)
{
this.value = await this.valueFactory(trace, cancellationToken);
Expand Down
Loading