Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Adds detailed delegate context , stream, and manual checkpointing support #2650

Merged
merged 4 commits into from
Aug 18, 2021

Conversation

ealsur
Copy link
Member

@ealsur ealsur commented Aug 4, 2021

After 4 months on preview (#2331) this PR moves to GA the new delegates and stream support for Change Feed Processor.

New delegates

We previously had a single delegate to process changes:

public delegate Task ChangesHandler<T>(
	IReadOnlyCollection<T> changes,
	CancellationToken cancellationToken);

In order to address the requests in the linked incidents, we are introducing 4 new delegates:

// Adds the context, similar to the existing handler
public delegate Task ChangeFeedHandler<T>(
	ChangeFeedProcessorContext context,
	IReadOnlyCollection<T> changes,
	CancellationToken cancellationToken);

// Adds support for manual checkpoint on the typed API
public delegate Task ChangeFeedHandlerWithManualCheckpoint<T>(
	ChangeFeedProcessorContext context,
	IReadOnlyCollection<T> changes,
	Func<Task> checkpointAsync,
	CancellationToken cancellationToken);

// Adds support for stream handling
public delegate Task ChangeFeedStreamHandler(
	ChangeFeedProcessorContext context,
	Stream changes,
	CancellationToken cancellationToken);

// Adds supports for manual checkpointing with stream handling
public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
	ChangeFeedProcessorContext context,
	Stream changes,
	Func<Task> checkpointAsync,
	CancellationToken cancellationToken);

Container API changes

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
	string processorName,
	ChangeFeedHandler<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
	string processorName,
	ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(
	string processorName,
	ChangeFeedStreamHandler onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(
	string processorName,
	ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate);

Stream support

Stream APIs allow customers to send the data to other components without serializing it on the Change Feed Processor, this is possible through 2 of the new delegates, the most common one that will handle checkpointing for the user:

public delegate Task ChangeFeedStreamHandler(
	ChangeFeedProcessorContext context,
	Stream changes,
	CancellationToken cancellationToken);

And one where the use can manage checkpoint themselves:

public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
	ChangeFeedProcessorContext context,
	Stream changes,
	Func<Task> checkpointAsync,
	CancellationToken cancellationToken);

How to leverage manual checkpoint?

There isn't any extra configuration required on the ChangeFeedProcessorBuilder other than using the delegate that exposes the checkpoint logic. Choosing this delegate is an implicit opt-in on the logic.

This enables users to decide when to checkpoint the progress of a lease. Some users decide to do buffering of changes and once the buffer commits (processes all the changes), then they decide to checkpoint.

Any of the 2 delegates that expose the checkpoint mechanism allows for this, via the call to checkpointAsync, like so:

ChangeFeedProcessor processor = container
	.GetChangeFeedProcessorBuilderWithManualCheckpoint("myProcessorName", 
		async (ChangeFeedProcessorContext context, IReadOnlyCollection<T> changes, Func<Task> checkpointAsync, CancellationToken token) =>
	{
		// manage and process the changes

		// on some condition, trigger the checkpoint
               if (someConditionIsMet)
               {
		    await checkpointAsync();
               }
	})
	.WithInstanceName("<instance name>")
	.WithLeaseContainer(leaseContainer).Build();

What is there in the context?

    public abstract class ChangeFeedProcessorContext
    {
        /// <summary>
        /// Gets the token representative of the current lease from which the changes come from.
        /// </summary>
        public abstract string LeaseToken { get; }

        /// <summary>
        /// Gets the diagnostics related to the service response.
        /// </summary>
        public abstract CosmosDiagnostics Diagnostics { get; }

        /// <summary>
        /// Gets the headers related to the service response that provided the changes.
        /// </summary>
        public abstract Headers Headers { get; }
    }

The context includes information related to changes delivered on the delegate, it includes the identifier of which lease they belong to (this is useful paired with the Container.ChangeFeedEstimator for monitoring). the Headers (that expose RU consumption and SessionToken for extending the session to other components), and the Diagnostics.

Type of change

  • New feature (non-breaking change which adds functionality)

cc @bartelink

/// <param name="changes">The changes that happened.</param>
/// <param name="cancellationToken">A cancellation token representing the current cancellation status of the <see cref="ChangeFeedProcessor"/> instance.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation that is going to be done with the changes.</returns>
public delegate Task ChangeFeedHandler<T>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v4 SDK changefeed should be put under it's own object like Scripts to keep all the change feed APIs together while reducing the container object bloating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure, scripts namespace is meant for server side scripting (SPROC, Trigger, CRUD etc...).
Sure lets track it for discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm meant have a container.ChangeFeed.GetProcessorBuilderWithManualCheckpoint. Then have all the methods and delegates off of the container.ChangeFeed object. That way all the changefeed methods are grouped together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, grouping could allow them to live in a ChangeFeed namespace maybe

@ealsur ealsur merged commit ad418d5 into master Aug 18, 2021
@ealsur ealsur deleted the users/ealsur/manualga branch August 18, 2021 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants