Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed: Fixes exceptions generating "Change Feed should always have a next page" #2474

Merged
merged 18 commits into from
May 19, 2021
15 changes: 12 additions & 3 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
trace,
out CosmosException cosmosException))
{
// Initialization issue, there are no enumerators to invoke
this.hasMoreResults = false;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
throw createException;
}

Expand All @@ -247,9 +249,16 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = monadicEnumerator.Result;
if (!await enumerator.MoveNextAsync(trace))
try
{
if (!await enumerator.MoveNextAsync(trace))
{
throw new InvalidOperationException("ChangeFeed enumerator should always have a next continuation");
}
}
catch (OperationCanceledException ex) when (!(ex is CosmosOperationCanceledException))
{
throw new InvalidOperationException("ChangeFeed enumerator should always have a next continuation");
throw new CosmosOperationCanceledException(ex, trace);
}

if (enumerator.Current.Failed)
Expand All @@ -259,7 +268,7 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
trace,
out CosmosException cosmosException))
{
throw enumerator.Current.Exception;
throw ExceptionWithStackTraceException.UnWrapMonadExcepion(enumerator.Current.Exception, trace);
}

if (!IsRetriableException(cosmosException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,19 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)

PartitionRangePageAsyncEnumerator<TPage, TState> currentPaginator = enumerators.Dequeue();
currentPaginator.SetCancellationToken(this.cancellationToken);
if (!await currentPaginator.MoveNextAsync(childTrace))
bool moveNextResult = false;
try
{
moveNextResult = await currentPaginator.MoveNextAsync(childTrace);
}
catch
{
// Re-queue the enumerator to avoid emptying the queue
enumerators.Enqueue(currentPaginator);
throw;
}

if (!moveNextResult)
{
// Current enumerator is empty,
// so recursively retry on the next enumerator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Monads
{
using System;
using System.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ExceptionWithStackTraceException : Exception
{
Expand Down Expand Up @@ -75,5 +76,23 @@ private string GetClassName()
{
return this.GetType().ToString();
}

public static Exception UnWrapMonadExcepion(
Exception exception,
ITrace trace)
{
if (exception is ExceptionWithStackTraceException exceptionWithStackTrace)
{
return ExceptionWithStackTraceException.UnWrapMonadExcepion(exceptionWithStackTrace.InnerException, trace);
}

if (!(exception is CosmosOperationCanceledException)
&& exception is OperationCanceledException operationCanceledException)
{
return new CosmosOperationCanceledException(operationCanceledException, trace);
}

return exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,52 @@ public async Task ShouldSkipNotModifiedAndReturnResults()
It.IsAny<CancellationToken>()), Times.Never);
}

[TestMethod]
public async Task ShouldReturnTryCatchOnException()
{
ReadOnlyMemory<FeedRangeState<ChangeFeedState>> rangeStates = new FeedRangeState<ChangeFeedState>[]{
new FeedRangeState<ChangeFeedState>(FeedRangeEpk.FullRange, ChangeFeedState.Now())
};

CrossFeedRangeState<ChangeFeedState> state = new CrossFeedRangeState<ChangeFeedState>(rangeStates);
Mock<IDocumentContainer> documentContainer = new Mock<IDocumentContainer>();

Exception exception = new Exception("oh no");

// Throws unhandled exception
documentContainer.Setup(c => c.MonadicChangeFeedAsync(
It.IsAny<FeedRangeState<ChangeFeedState>>(),
It.IsAny<ChangeFeedPaginationOptions>(),
It.IsAny<ITrace>(),
It.IsAny<CancellationToken>())).ThrowsAsync(exception);
CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
documentContainer.Object,
state,
ChangeFeedPaginationOptions.Default,
cancellationToken: default);

try
{
await enumerator.MoveNextAsync(NoOpTrace.Singleton);
Assert.Fail("Should have thrown");
}
catch (Exception caughtException)
{
Assert.AreEqual(exception, caughtException);
}

// Should be able to read MoveNextAsync again
try
{
await enumerator.MoveNextAsync(NoOpTrace.Singleton);
Assert.Fail("Should have thrown");
}
catch (Exception caughtException)
{
Assert.AreEqual(exception, caughtException);
}
}

private static async Task<(int, double)> DrainUntilNotModifedAsync(CrossPartitionChangeFeedAsyncEnumerator enumerator)
{
int globalCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Tests.Pagination;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -208,6 +209,163 @@ public async Task ChangeFeedIteratorCore_HandlesSplitsThroughPipeline()
Assert.AreEqual(numItems, count, seed);
}

[TestMethod]
public async Task ChangeFeedIteratorCore_OnCosmosException_HasMoreResults()
{
CosmosException exception = CosmosExceptionFactory.CreateInternalServerErrorException("something's broken", new Headers());
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems:0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
returnFailure: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Now(),
this.MockClientContext());

ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.InternalServerError, responseMessage.StatusCode);
Assert.IsFalse(changeFeedIteratorCore.HasMoreResults);
}

[TestMethod]
public async Task ChangeFeedIteratorCore_OnRetriableCosmosException_HasMoreResults()
{
CosmosException exception = CosmosExceptionFactory.CreateThrottledException("retry", new Headers());
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
returnFailure: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.AreEqual(HttpStatusCode.TooManyRequests, responseMessage.StatusCode);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}

[TestMethod]
public async Task ChangeFeedIteratorCore_OnNonCosmosExceptions_HasMoreResults()
{
Exception exception = new NotImplementedException();
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
returnFailure: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

try
{
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.Fail("Should have thrown");
}
catch (Exception ex)
{
Assert.AreEqual(exception, ex);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}
}

[TestMethod]
public async Task ChangeFeedIteratorCore_OnTaskCanceledException_HasMoreResultsAndDiagnostics()
{
Exception exception = new TaskCanceledException();
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
throwException: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

try
{
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.Fail("Should have thrown");
}
catch (OperationCanceledException ex)
{
Assert.IsTrue(ex is CosmosOperationCanceledException);
Assert.IsNotNull(((CosmosOperationCanceledException)ex).Diagnostics);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}
}

/// <summary>
/// If an unhandled exception occurs within the NetworkAttachedDocumentContainer, the exception is transmitted but it does not break the enumerators
/// </summary>
[TestMethod]
public async Task ChangeFeedIteratorCore_OnUnhandledException_HasMoreResults()
{
Exception exception = new Exception("oh no");
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(
numItems: 0,
failureConfigs: new FlakyDocumentContainer.FailureConfigs(
inject429s: false,
injectEmptyPages: false,
throwException: exception));

ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(
documentContainer,
ChangeFeedMode.Incremental,
new ChangeFeedRequestOptions(),
ChangeFeedStartFrom.Beginning(),
this.MockClientContext());

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
try
{
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.Fail("Should have thrown");
}
catch (Exception ex)
{
Assert.AreEqual(exception, ex);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}

// If read a second time, it should not throw any missing page errors related to enumerators
try
{
ResponseMessage responseMessage = await changeFeedIteratorCore.ReadNextAsync();
Assert.Fail("Should have thrown");
}
catch (Exception ex)
{
// TryCatch wraps any exception
Assert.AreEqual(exception, ex);
Assert.IsTrue(changeFeedIteratorCore.HasMoreResults);
}
}

private static CosmosArray GetChanges(Stream stream)
{
using (MemoryStream memoryStream = new MemoryStream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ public Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
state: feedRangeState.State)));
}

if (this.ShouldThrowException(out Exception exception))
{
throw exception;
}

if (this.ShouldReturnFailure(out Exception failure))
{
return Task.FromResult(TryCatch<ChangeFeedPage>.FromException(failure));
}

return this.documentContainer.MonadicChangeFeedAsync(
feedRangeState,
changeFeedPaginationOptions,
Expand Down Expand Up @@ -253,17 +263,39 @@ private bool ShouldReturnEmptyPage() => (this.failureConfigs != null)
&& this.failureConfigs.InjectEmptyPages
&& ((this.random.Next() % 2) == 0);

private bool ShouldThrowException(out Exception exception)
{
exception = this.failureConfigs.ThrowException;
return this.failureConfigs != null && this.failureConfigs.ThrowException != null;
}

private bool ShouldReturnFailure(out Exception exception)
{
exception = this.failureConfigs.ReturnFailure;
return this.failureConfigs != null && this.failureConfigs.ReturnFailure != null;
}

public sealed class FailureConfigs
{
public FailureConfigs(bool inject429s, bool injectEmptyPages)
public FailureConfigs(
bool inject429s,
bool injectEmptyPages,
Exception throwException = null,
Exception returnFailure = null)
{
this.Inject429s = inject429s;
this.InjectEmptyPages = injectEmptyPages;
this.ThrowException = throwException;
this.ReturnFailure = returnFailure;
}

public bool Inject429s { get; }

public bool InjectEmptyPages { get; }

public Exception ThrowException { get; }

public Exception ReturnFailure { get; }
}
}
}