Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Preview] Change Feed Processor: Adds notification APIs #2613

Merged
merged 28 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c1dc7ff
Initial types and implementation
ealsur Jun 29, 2021
d424b70
Wiring through known places
ealsur Jun 29, 2021
08f4d95
Refactor monitoring
ealsur Jul 9, 2021
212f88c
New tests
ealsur Jul 9, 2021
e92da8b
public exception
ealsur Jul 9, 2021
0107f65
tests
ealsur Jul 9, 2021
66bc67f
Refactoring error messaging so users get diagnostics
ealsur Jul 9, 2021
0bbcfca
Wiring through estimator
ealsur Jul 9, 2021
05ba82a
tests
ealsur Jul 9, 2021
10e66c9
Merge branch 'master' into users/ealsur/healthmonitor
ealsur Jul 9, 2021
fd3760f
undoing and cleaning up
ealsur Jul 9, 2021
a1cd9ce
Merge branch 'master' into users/ealsur/healthmonitor
ealsur Jul 13, 2021
a30b969
Rename of base implementation and refactor of public API
ealsur Jul 13, 2021
7bc0107
more unit tests
ealsur Jul 13, 2021
bc023ef
emulator tests
ealsur Jul 13, 2021
bece93f
docs
ealsur Jul 13, 2021
b3893bb
adding context to exception
ealsur Jul 14, 2021
0b88d94
contract
ealsur Jul 14, 2021
919ba04
undo file
ealsur Jul 14, 2021
1e487b5
undo more
ealsur Jul 14, 2021
8489437
Merge branch 'master' into users/ealsur/healthmonitor
ealsur Aug 2, 2021
2260bc1
logging always
ealsur Aug 6, 2021
212d573
Merge branch 'master' into users/ealsur/healthmonitor
ealsur Aug 6, 2021
81d9f9c
Addressing comments
ealsur Aug 10, 2021
aa6662f
Merge branch 'master' into users/ealsur/healthmonitor
ealsur Aug 10, 2021
7adb425
contract
ealsur Aug 10, 2021
acbc664
merge with master
ealsur Aug 20, 2021
95ef92f
preview update after merge
ealsur Aug 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal sealed class ChangeFeedEstimatorRunner : ChangeFeedProcessor
private const string EstimatorDefaultHostName = "Estimator";
private readonly ChangesEstimationHandler initialEstimateDelegate;
private readonly TimeSpan? estimatorPeriod;
private ChangeFeedProcessorHealthMonitor healthMonitor;
private CancellationTokenSource shutdownCts;
private ContainerInternal leaseContainer;
private ContainerInternal monitoredContainer;
Expand Down Expand Up @@ -75,6 +76,7 @@ public void ApplyBuildConfiguration(
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
this.changeFeedLeaseOptions = changeFeedLeaseOptions;
this.documentServiceLeaseContainer = customDocumentServiceLeaseStoreManager?.LeaseContainer;
this.healthMonitor = changeFeedProcessorOptions.HealthMonitor;
}

public override async Task StartAsync()
Expand Down Expand Up @@ -128,7 +130,7 @@ private FeedEstimatorRunner BuildFeedEstimatorRunner()
this.documentServiceLeaseContainer);
}

return new FeedEstimatorRunner(this.initialEstimateDelegate, this.remainingWorkEstimator, this.estimatorPeriod);
return new FeedEstimatorRunner(this.initialEstimateDelegate, this.remainingWorkEstimator, this.healthMonitor, this.estimatorPeriod);
}

private async Task InitializeLeaseStoreAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ namespace Microsoft.Azure.Cosmos
using System;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
#if PREVIEW
using static Microsoft.Azure.Cosmos.Container;
#else
using static Microsoft.Azure.Cosmos.ContainerInternal;
#endif

/// <summary>
/// Provides a flexible way to create an instance of <see cref="ChangeFeedProcessor"/> with custom set of parameters.
Expand All @@ -25,7 +30,7 @@ public class ChangeFeedProcessorBuilder
ChangeFeedProcessorOptions,
ContainerInternal> applyBuilderConfiguration;

private ChangeFeedProcessorOptions changeFeedProcessorOptions;
private readonly ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();

private ContainerInternal leaseContainer;
private string InstanceName;
Expand Down Expand Up @@ -96,7 +101,6 @@ public ChangeFeedProcessorBuilder WithPollInterval(TimeSpan pollInterval)
throw new ArgumentNullException(nameof(pollInterval));
}

this.changeFeedProcessorOptions = this.changeFeedProcessorOptions ?? new ChangeFeedProcessorOptions();
this.changeFeedProcessorOptions.FeedPollDelay = pollInterval;
return this;
}
Expand All @@ -114,7 +118,6 @@ public ChangeFeedProcessorBuilder WithPollInterval(TimeSpan pollInterval)
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
internal virtual ChangeFeedProcessorBuilder WithStartFromBeginning()
{
this.changeFeedProcessorOptions = this.changeFeedProcessorOptions ?? new ChangeFeedProcessorOptions();
this.changeFeedProcessorOptions.StartFromBeginning = true;
return this;
}
Expand All @@ -137,7 +140,6 @@ public ChangeFeedProcessorBuilder WithStartTime(DateTime startTime)
throw new ArgumentNullException(nameof(startTime));
}

this.changeFeedProcessorOptions = this.changeFeedProcessorOptions ?? new ChangeFeedProcessorOptions();
this.changeFeedProcessorOptions.StartTime = startTime;
return this;
}
Expand All @@ -154,7 +156,6 @@ public ChangeFeedProcessorBuilder WithMaxItems(int maxItemCount)
throw new ArgumentOutOfRangeException(nameof(maxItemCount));
}

this.changeFeedProcessorOptions = this.changeFeedProcessorOptions ?? new ChangeFeedProcessorOptions();
this.changeFeedProcessorOptions.MaxItemCount = maxItemCount;
return this;
}
Expand Down Expand Up @@ -213,6 +214,69 @@ internal virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer()
return this;
}

/// <summary>
/// Defines a delegate to receive notifications on errors that occur during change feed processor execution.
/// </summary>
/// <param name="errorDelegate">A delegate to receive notifications for change feed processor related errors.</param>
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
#if PREVIEW
public
#else
internal
#endif
ChangeFeedProcessorBuilder WithErrorNotification(ChangeFeedMonitorErrorDelegate errorDelegate)
{
if (errorDelegate == null)
{
throw new ArgumentNullException(nameof(errorDelegate));
}

this.changeFeedProcessorOptions.HealthMonitor.SetErrorDelegate(errorDelegate);
return this;
}

/// <summary>
/// Defines a delegate to receive notifications on lease acquires that occur during change feed processor execution.
/// </summary>
/// <param name="acquireDelegate">A delegate to receive notifications when a change feed processor acquires a lease.</param>
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
#if PREVIEW
public
#else
internal
#endif
ChangeFeedProcessorBuilder WithLeaseAcquireNotification(ChangeFeedMonitorLeaseAcquireDelegate acquireDelegate)
{
if (acquireDelegate == null)
{
throw new ArgumentNullException(nameof(acquireDelegate));
}

this.changeFeedProcessorOptions.HealthMonitor.SetLeaseAcquireDelegate(acquireDelegate);
return this;
}

/// <summary>
/// Defines a delegate to receive notifications on lease releases that occur during change feed processor execution.
/// </summary>
/// <param name="releaseDelegate">A delegate to receive notifications when a change feed processor releases a lease.</param>
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
#if PREVIEW
public
#else
internal
#endif
ChangeFeedProcessorBuilder WithLeaseReleaseNotification(ChangeFeedMonitorLeaseReleaseDelegate releaseDelegate)
{
if (releaseDelegate == null)
{
throw new ArgumentNullException(nameof(releaseDelegate));
}

this.changeFeedProcessorOptions.HealthMonitor.SetLeaseReleaseDelegate(releaseDelegate);
return this;
}

/// <summary>
/// Builds a new instance of the <see cref="ChangeFeedProcessor"/> with the specified configuration.
/// </summary>
Expand All @@ -239,16 +303,10 @@ public ChangeFeedProcessor Build()
throw new InvalidOperationException("Processor name not specified during creation.");
}

this.InitializeDefaultOptions();
this.applyBuilderConfiguration(this.LeaseStoreManager, this.leaseContainer, this.InstanceName, this.changeFeedLeaseOptions, this.changeFeedProcessorOptions, this.monitoredContainer);

this.isBuilt = true;
return this.changeFeedProcessor;
}

private void InitializeDefaultOptions()
{
this.changeFeedProcessorOptions = this.changeFeedProcessorOptions ?? new ChangeFeedProcessorOptions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Monitoring;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -117,9 +116,13 @@ private PartitionManager BuildPartitionManager(
EqualPartitionsBalancingStrategy.DefaultMaxLeaseCount,
this.changeFeedLeaseOptions.LeaseExpirationInterval);

PartitionController partitionController = new PartitionControllerCore(this.documentServiceLeaseStoreManager.LeaseContainer, this.documentServiceLeaseStoreManager.LeaseManager, partitionSuperviserFactory, synchronizer);
PartitionController partitionController = new PartitionControllerCore(
this.documentServiceLeaseStoreManager.LeaseContainer,
this.documentServiceLeaseStoreManager.LeaseManager,
partitionSuperviserFactory,
synchronizer,
this.changeFeedProcessorOptions.HealthMonitor);

partitionController = new HealthMonitoringPartitionControllerDecorator(partitionController, new TraceHealthMonitor());
PartitionLoadBalancerCore partitionLoadBalancer = new PartitionLoadBalancerCore(
partitionController,
this.documentServiceLeaseStoreManager.LeaseContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Configuration
{
using System;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.ChangeFeed.Monitoring;

/// <summary>
/// Options to control various aspects of partition distribution happening within <see cref="ChangeFeedProcessorCore"/> instance.
Expand Down Expand Up @@ -81,5 +82,7 @@ public DateTime? StartTime
/// </remarks>
/// <seealso cref="ChangeFeedOptions.StartFromBeginning"/>
public bool StartFromBeginning { get; set; }

public ChangeFeedProcessorHealthMonitorCore HealthMonitor { get; set; } = new ChangeFeedProcessorHealthMonitorCore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.DocDBErrors
internal enum DocDbError
{
Undefined,
PartitionSplit,
PartitionNotFound,
ReadSessionNotAvailable
PartitionSplit
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ public static DocDbError ClassifyStatusCodes(
return DocDbError.PartitionSplit;
}

if (statusCode == HttpStatusCode.NotFound)
{
return subStatusCode == (int)SubStatusCodes.ReadSessionNotAvailable ? DocDbError.ReadSessionNotAvailable : DocDbError.PartitionNotFound;
}

return DocDbError.Undefined;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed.Exceptions
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Runtime.Serialization;
Expand All @@ -11,29 +11,44 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Exceptions
/// Exception occurred when an operation in an IChangeFeedObserver is running and throws by user code
/// </summary>
[Serializable]
internal class ObserverException : Exception

#if PREVIEW
public
#else
internal
#endif
class ChangeFeedProcessorUserException : Exception
{
private static readonly string DefaultMessage = "Exception has been thrown by the Observer.";
private static readonly string DefaultMessage = "Exception has been thrown by the change feed processor delegate.";

/// <summary>
/// Initializes a new instance of the <see cref="ObserverException" /> class using the specified internal exception.
/// Initializes a new instance of the <see cref="ChangeFeedProcessorUserException " /> class using the specified internal exception.
/// </summary>
/// <param name="originalException"><see cref="Exception"/> thrown by the user code.</param>
public ObserverException(Exception originalException)
: base(ObserverException.DefaultMessage, originalException)
/// <param name="context">Context under which the exception occurred.</param>
public ChangeFeedProcessorUserException(
Exception originalException,
ChangeFeedProcessorContext context)
: base(ChangeFeedProcessorUserException.DefaultMessage, originalException)
{
this.ExceptionContext = context;
}

/// <summary>
/// Initializes a new instance of the <see cref="ObserverException" /> class using default values.
/// Initializes a new instance of the <see cref="ChangeFeedProcessorUserException " /> for serialization purposes.
/// </summary>
/// <param name="info">The SerializationInfo object that holds serialized object data for the exception being thrown.</param>
/// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
protected ObserverException(SerializationInfo info, StreamingContext context)
: this((Exception)info.GetValue("InnerException", typeof(Exception)))
protected ChangeFeedProcessorUserException(SerializationInfo info, StreamingContext context)
: this((Exception)info.GetValue("InnerException", typeof(Exception)), null)
{
}

/// <summary>
/// Contextual information that identifies which was the payload that was delivered to the delegate when this error occurred.
/// </summary>
public ChangeFeedProcessorContext ExceptionContext { get; private set; }
ealsur marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Sets the System.Runtime.Serialization.SerializationInfo with information about the exception.
/// </summary>
Expand Down

This file was deleted.

This file was deleted.

Loading