Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Preview] Change Feed Processor: Refactors manual checkpoint API #2488

Merged
merged 21 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,12 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManu
async (
ChangeFeedProcessorContext context,
IReadOnlyCollection<JObject> documents,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
List<T> decryptItems = await this.DecryptChangeFeedDocumentsAsync<T>(
Expand Down Expand Up @@ -973,7 +978,12 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManu
async (
ChangeFeedProcessorContext context,
Stream changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
Stream decryptedChanges = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
Expand Down
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,12 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManu
async (
ChangeFeedProcessorContext context,
IReadOnlyCollection<JObject> documents,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
List<T> decryptedItems = await this.DecryptChangeFeedDocumentsAsync<T>(
Expand Down Expand Up @@ -835,7 +840,12 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManu
async (
ChangeFeedProcessorContext context,
Stream changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
EncryptionSettings encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@
<AssemblyOriginatorKeyFile>..\..\35MSSharedLib1024.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<PropertyGroup Condition=" '$(SdkProjectRef)' == 'True' ">
<DefineConstants>$(DefineConstants);SDKPROJECTREF</DefineConstants>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,12 @@ private async Task ValidateChangeFeedProcessorWithManualCheckpointResponse(
(
ChangeFeedProcessorContext context,
IReadOnlyCollection<TestDoc> changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
changeFeedReturnedDocs.AddRange(changes);
Expand Down Expand Up @@ -1375,7 +1380,12 @@ private async Task ValidateChangeFeedProcessorStreamWithManualCheckpointResponse
(
ChangeFeedProcessorContext context,
Stream changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
string changeFeed = string.Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2042,7 +2042,12 @@ private async Task ValidateChangeFeedProcessorWithManualCheckpointResponse(
(
ChangeFeedProcessorContext context,
IReadOnlyCollection<TestDoc> changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
changeFeedReturnedDocs.AddRange(changes);
Expand Down Expand Up @@ -2146,7 +2151,12 @@ private async Task ValidateChangeFeedProcessorStreamWithManualCheckpointResponse
(
ChangeFeedProcessorContext context,
Stream changes,
#if SDKPROJECTREF
Func<Task> tryCheckpointAsync,
#else
// Remove on next release
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
#endif
CancellationToken cancellationToken) =>
{
string changeFeed = string.Empty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@
<PlatformTarget>x64</PlatformTarget>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition=" '$(SdkProjectRef)' == 'True' ">
<DefineConstants>$(DefineConstants);SDKPROJECTREF</DefineConstants>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ public override async Task ProcessChangesAsync(
{
await this.observer.ProcessChangesAsync(context, stream, cancellationToken).ConfigureAwait(false);

(bool isSuccess, Exception exception) = await context.TryCheckpointAsync();
if (!isSuccess)
{
throw exception;
}
await context.CheckpointAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ internal ChangeFeedObserverContextCore(

public Headers Headers => this.responseMessage.Headers;

public async Task<(bool isSuccess, Exception error)> TryCheckpointAsync()
public async Task CheckpointAsync()
{
try
{
await this.checkpointer.CheckpointPartitionAsync(this.responseMessage.Headers.ContinuationToken);
return (isSuccess: true, error: null);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
catch (LeaseLostException leaseLostException)
{
Expand All @@ -55,11 +54,7 @@ internal ChangeFeedObserverContextCore(
trace: NoOpTrace.Singleton,
error: null,
innerException: leaseLostException);
return (isSuccess: false, error: cosmosException);
}
catch (Exception exception)
{
return (isSuccess: false, error: exception);
throw cosmosException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private Task ChangesStreamHandlerAsync(
return this.onChanges(context, stream, cancellationToken);
}

return this.onChangesWithManualCheckpoint(context, stream, context.TryCheckpointAsync, cancellationToken);
return this.onChangesWithManualCheckpoint(context, stream, context.CheckpointAsync, cancellationToken);
}
}

Expand Down Expand Up @@ -108,7 +108,7 @@ private Task ChangesStreamHandlerAsync(
return this.onChanges(context, changes, cancellationToken);
}

return this.onChangesWithManualCheckpoint(context, changes, context.TryCheckpointAsync, cancellationToken);
return this.onChangesWithManualCheckpoint(context, changes, context.CheckpointAsync, cancellationToken);
}

private IReadOnlyCollection<T> AsIReadOnlyCollection(Stream stream)
Expand Down
26 changes: 8 additions & 18 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,31 +1482,26 @@ public delegate Task ChangeFeedHandler<T>(
/// </summary>
/// <param name="context">The context related to the changes.</param>
/// <param name="changes">The changes that happened.</param>
/// <param name="tryCheckpointAsync">A task representing an asynchronous checkpoint on the progress of a lease.</param>
/// <param name="checkpointAsync">A task representing an asynchronous checkpoint on the progress of a lease.</param>
/// <param name="cancellationToken">A cancellation token representing the current cancellation status of the <see cref="ChangeFeedProcessor"/> instance.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation that is going to be done with the changes.</returns>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// (ChangeFeedProcessorContext context, IReadOnlyCollection<T> changes, Func<Task<(bool isSuccess, CosmosException error)>> tryCheckpointAsync, CancellationToken cancellationToken) =>
/// (ChangeFeedProcessorContext context, IReadOnlyCollection<T> changes, Func<Task> checkpointAsync, CancellationToken cancellationToken) =>
/// {
/// // consume changes
///
/// // On certain condition, we can checkpoint
/// (bool isSuccess, Exception error) checkpointResult = await tryCheckpointAsync();
/// if (!isSuccess)
/// {
/// // log error, could not checkpoint
/// throw error;
/// }
/// await checkpointAsync();
/// }
/// ]]>
/// </code>
/// </example>
public delegate Task ChangeFeedHandlerWithManualCheckpoint<T>(
ChangeFeedProcessorContext context,
IReadOnlyCollection<T> changes,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
Func<Task> checkpointAsync,
CancellationToken cancellationToken);

/// <summary>
Expand All @@ -1526,31 +1521,26 @@ public delegate Task ChangeFeedStreamHandler(
/// </summary>
/// <param name="context">The context related to the changes.</param>
/// <param name="changes">The changes that happened.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be something a little more obvious like changesPayload?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, in this case, it is actually the list (IReadOnlyCollection<T>) of changes that happened. Those changes in the collection could be inserts or updates, but they are still changes. Not sure how the word payload makes it more clear?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the current API that is GA uses changes as name, so I rather keep the same as we have no feedback that indicates the name brings confusion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it's fine. It just seemed a little off that a stream would be called changes.

/// <param name="tryCheckpointAsync">A task representing an asynchronous checkpoint on the progress of a lease.</param>
/// <param name="checkpointAsync">A task representing an asynchronous checkpoint on the progress of a lease.</param>
/// <param name="cancellationToken">A cancellation token representing the current cancellation status of the <see cref="ChangeFeedProcessor"/> instance.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation that is going to be done with the changes.</returns>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// (ChangeFeedProcessorContext context, Stream stream, Func<Task<(bool isSuccess, CosmosException error)>> tryCheckpointAsync, CancellationToken cancellationToken) =>
/// (ChangeFeedProcessorContext context, Stream stream, Func<Task> checkpointAsync, CancellationToken cancellationToken) =>
/// {
/// // consume stream
///
/// // On certain condition, we can checkpoint
/// (bool isSuccess, Exception error) checkpointResult = await tryCheckpointAsync();
/// if (!isSuccess)
/// {
/// // log error, could not checkpoint
/// throw error;
/// }
/// await checkpointAsync();
/// }
/// ]]>
/// </code>
/// </example>
public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
ChangeFeedProcessorContext context,
Stream changes,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
ealsur marked this conversation as resolved.
Show resolved Hide resolved
Func<Task> checkpointAsync,
CancellationToken cancellationToken);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public delegate Task ChangeFeedHandler<T>(
public delegate Task ChangeFeedHandlerWithManualCheckpoint<T>(
ChangeFeedProcessorContext context,
IReadOnlyCollection<T> changes,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
Func<Task> checkpointAsync,
CancellationToken cancellationToken);

public delegate Task ChangeFeedStreamHandler(
Expand All @@ -194,7 +194,7 @@ public delegate Task ChangeFeedStreamHandler(
public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
ChangeFeedProcessorContext context,
Stream changes,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
Func<Task> checkpointAsync,
CancellationToken cancellationToken);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilderWithManualCheckpoint("test", async (ChangeFeedProcessorContext context, Stream stream, Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync, CancellationToken token) =>
.GetChangeFeedProcessorBuilderWithManualCheckpoint("test", async (ChangeFeedProcessorContext context, Stream stream, Func<Task> checkpointAsync, CancellationToken token) =>
{
this.ValidateContext(context);

Expand All @@ -113,9 +113,7 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()
if (processedDocCount == 1)
{
// Checkpointing on the first document to be able to have a point to rollback to
(bool isSuccess, Exception exception) = await tryCheckpointAsync();
Assert.IsTrue(isSuccess);
Assert.IsNull(exception);
await checkpointAsync();
}

if (processedDocCount == 12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilderWithManualCheckpoint("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync, CancellationToken token) =>
.GetChangeFeedProcessorBuilderWithManualCheckpoint("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, Func<Task> checkpointAsync, CancellationToken token) =>
{
this.ValidateContext(context);
processedDocCount += docs.Count();
Expand All @@ -106,9 +106,7 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()

if (processedDocCount == 1) {
// Checkpointing on the first document to be able to have a point to rollback to
(bool isSuccess, Exception exception) = await tryCheckpointAsync();
Assert.IsTrue(isSuccess);
Assert.IsNull(exception);
await checkpointAsync();
}

if (processedDocCount == 12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public async Task TryCheckpoint_OnSuccess()
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).Returns(Task.CompletedTask);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);

(bool isSuccess, Exception exception) = await changeFeedObserverContextCore.TryCheckpointAsync();
Assert.IsTrue(isSuccess);
Assert.IsNull(exception);
await changeFeedObserverContextCore.CheckpointAsync();
}

[TestMethod]
Expand All @@ -58,10 +56,8 @@ public async Task TryCheckpoint_OnLeaseLost()
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(new LeaseLostException());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);

(bool isSuccess, Exception exception) = await changeFeedObserverContextCore.TryCheckpointAsync();
Assert.IsFalse(isSuccess);
Assert.IsNotNull(exception);
Assert.AreEqual(HttpStatusCode.PreconditionFailed, (exception as CosmosException).StatusCode);
CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.AreEqual(HttpStatusCode.PreconditionFailed, exception.StatusCode);
}

[TestMethod]
Expand All @@ -76,9 +72,7 @@ public async Task TryCheckpoint_OnCosmosException()
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);

(bool isSuccess, Exception exception) = await changeFeedObserverContextCore.TryCheckpointAsync();
Assert.IsFalse(isSuccess);
Assert.IsNotNull(exception);
CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
}

Expand All @@ -94,9 +88,7 @@ public async Task TryCheckpoint_OnUnknownException()
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);

(bool isSuccess, Exception exception) = await changeFeedObserverContextCore.TryCheckpointAsync();
Assert.IsFalse(isSuccess);
Assert.IsNotNull(exception);
NotImplementedException exception = await Assert.ThrowsExceptionAsync<NotImplementedException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dyna
public async Task WhenDelegateIsTyped_Manual()
{
bool executed = false;
Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, Func<Task<(bool isSuccess, Exception exception)>> tryCheckpointAsync, CancellationToken token)
Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, Func<Task> checkpointAsync, CancellationToken token)
{
Assert.AreEqual(1, docs.Count);
Assert.AreEqual("Test", docs.First().id.ToString());
Expand Down Expand Up @@ -128,7 +128,7 @@ public async Task WhenDelegateIsStream_Manual()
{
ResponseMessage responseMessage = this.BuildResponseMessage();
bool executed = false;
Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Func<Task<(bool isSuccess, Exception exception)>> tryCheckpointAsync, CancellationToken token)
Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Func<Task> checkpointAsync, CancellationToken token)
{
Assert.ReferenceEquals(responseMessage.Content, stream);
executed = true;
Expand Down
Loading