Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Preview] Change Feed Processor: Adds notification APIs #2613

Merged
merged 28 commits into from
Aug 25, 2021

Conversation

ealsur
Copy link
Member

@ealsur ealsur commented Jul 14, 2021

Pull Request Template

Description

This PR enables hooking to events that are happening inside the Change Feed Processor and cover scenarios where there might be internal errors processing the feed or unhandled user errors on the delegate.

It also exposes an exception type to reflect user-related processing errors:

public class ChangeFeedProcessorUserException : Exception
{
    // This contains the Headers and Diagnostics of the payload that generated the exception
    public ChangeFeedProcessorContext ExceptionContext { get; }
}
string instanceName = "my-machine-name";

var notifyErrorAsync = (string LeaseToken, Exception exception) =>
    {
        if (exception is ChangeFeedProcessorUserException userException)
        {
             Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception {userException.InnerException}");
             Console.WriteLine($"Diagnostics {userException.ExceptionContext.Diagnostics}");
             Console.WriteLine($"Headers {userException.ExceptionContext.Headers}");
        }
       else
       {
           // This could be matched for CosmosException to decide importance based in our guidelines
            Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
       }
     } ;

var acquireAsync = (string LeaseToken) =>
	{
		Console.WriteLine($"Lease {LeaseToken} has been acquired by {instanceName}");
                // Initialize any buffering required if needed
	};

var releaseAsync = (string LeaseToken) =>
	{
		Console.WriteLine($"Lease {LeaseToken} has been released on {instanceName} ");
                // Flush any buffering required if needed
	};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
	.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
		.WithInstanceName(instanceName)
		.WithErrorNotification(notifyErrorAsync)
                .WithLeaseAcquireNotification(acquireAsync)
                .WithLeaseReleaseNotification(releaseAsync)
		.WithLeaseContainer(leaseContainer)
		.Build();

This way, users can optionally subscribe and be notified on unhandled errors happening inside their delegate code or errors that might be happening during the internal Change Feed consumption.

Some errors are recoverable by the Change Feed Processor (see https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed-processor#error-handling), like timeouts, or transient processing errors (an error that occurred within the delegate that might be transient) and the Processor will retry the batch of changes again.

Lease acquire and release reflect events associated with the Change Feed Processor life cycle https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed-processor#processing-life-cycle that some users might be able to leverage in certain scenarios.

Notes on the changes

The code already contained an archaic version of the monitor based on CFP V2 files, part of this PR includes the removal of those unused files and repurposing of others, and making sure that Diagnostics are correctly wired in cases where there are CosmosExceptions tied to the consumption of the change feed.

Type of change

  • New feature (non-breaking change which adds functionality)

Closing issues

Closes #2501
Closes #1780

milismsft
milismsft previously approved these changes Aug 6, 2021
@kirankumarkolli
Copy link
Member

From API perspective, one more alternative is to have a single unified notification API and the context will be used to dis-ambiguate/reason on causes. The API surface, discoverability are simple, @ealsur thoughts?

@ealsur
Copy link
Member Author

ealsur commented Aug 9, 2021

From API perspective, one more alternative is to have a single unified notification API and the context will be used to dis-ambiguate/reason on causes. The API surface, discoverability are simple, @ealsur thoughts?

@kirankumarkolli Could you expand a bit on this? There are 3 events we want to expose:

  1. Lease Acquire
  2. Lease Release
  3. Any errors (happening inside user code/serialization or as part of the CFP internal operations)

How do we unify those 3 into a single API? The context for each seem a bit different? #2501 (comment) contains 2 alternatives, either a delegate approach (one delegate per event type and users only hook to what they want, normally just errors), or an abstract class that exposes the events as overridable methods. What other alternatives do you believe could be?

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (except for the error handling comment around custom delegates) - will be very helpful - would be nice to have in Java in similar way as well.

@kirankumarkolli
Copy link
Member

From API perspective, one more alternative is to have a single unified notification API and the context will be used to dis-ambiguate/reason on causes. The API surface, discoverability are simple, @ealsur thoughts?

@kirankumarkolli Could you expand a bit on this? There are 3 events we want to expose:

  1. Lease Acquire
  2. Lease Release
  3. Any errors (happening inside user code/serialization or as part of the CFP internal operations)

How do we unify those 3 into a single API? The context for each seem a bit different? #2501 (comment) contains 2 alternatives, either a delegate approach (one delegate per event type and users only hook to what they want, normally just errors), or an abstract class that exposes the events as overridable methods. What other alternatives do you believe could be?

The delegates are scattered contracts of each of such aggregated contract.
Some possibilities of Aggregated contract are abstract-class or abstract notify method with payload dynamically casted. Between the two personally I am for the abstract-class contract.

@ealsur ealsur merged commit f4bd8b6 into master Aug 25, 2021
@ealsur ealsur deleted the users/ealsur/healthmonitor branch August 25, 2021 18:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants