From 6cc83a54991e7179f0b25a3591a1c1016e81ada0 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 11 Feb 2025 13:40:11 +0100 Subject: [PATCH 1/9] Split up Observer in partial classes --- .../Grains/Observation/Observer.Catchup.cs | 138 ++++++ .../Grains/Observation/Observer.Failing.cs | 111 +++++ .../Grains/Observation/Observer.Handling.cs | 201 ++++++++ .../Grains/Observation/Observer.Replay.cs | 64 +++ Source/Kernel/Grains/Observation/Observer.cs | 464 +----------------- 5 files changed, 515 insertions(+), 463 deletions(-) create mode 100644 Source/Kernel/Grains/Observation/Observer.Catchup.cs create mode 100644 Source/Kernel/Grains/Observation/Observer.Failing.cs create mode 100644 Source/Kernel/Grains/Observation/Observer.Handling.cs create mode 100644 Source/Kernel/Grains/Observation/Observer.Replay.cs diff --git a/Source/Kernel/Grains/Observation/Observer.Catchup.cs b/Source/Kernel/Grains/Observation/Observer.Catchup.cs new file mode 100644 index 000000000..e3537c89f --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Catchup.cs @@ -0,0 +1,138 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Grains.EventSequences; +using Cratis.Chronicle.Grains.Observation.Jobs; +using Cratis.Chronicle.Grains.Observation.States; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task CatchUp() + { + _isPreparingCatchup = true; + using var scope = logger.BeginObserverScope(State.Id, _observerKey); + + var subscription = await GetSubscription(); + + var jobs = await _jobsManager.GetJobsOfType(); + if (jobs.Any(_ => _.Status == JobStatus.Running)) + { + logger.FinishingExistingCatchUpJob(); + return; + } + + var pausedJob = jobs.FirstOrDefault(_ => _.Status == JobStatus.Paused); + + if (pausedJob is not null) + { + logger.ResumingCatchUpJob(); + await _jobsManager.Resume(pausedJob.Id); + } + else + { + logger.StartCatchUpJob(State.NextEventSequenceNumber); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + subscription, + State.NextEventSequenceNumber, + State.EventTypes)); + } + } + + /// + public async Task RegisterCatchingUpPartitions(IEnumerable partitions) + { + using var scope = logger.BeginObserverScope(State.Id, _observerKey); + logger.RegisteringCatchingUpPartitions(); + foreach (var partition in partitions) + { + State.CatchingUpPartitions.Add(partition); + } + + await WriteStateAsync(); + + _isPreparingCatchup = false; + } + + /// + public async Task CaughtUp(EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await TransitionTo(); + } + + /// + public async Task PartitionCaughtUp(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.PartitionCaughtUp(partition, lastHandledEventSequenceNumber); + State.CatchingUpPartitions.Remove(partition); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } + + async Task StartCatchupJobIfNeeded(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + if (failures.State.IsFailed(partition)) + { + logger.PartitionToCatchUpIsFailing(partition); + return; + } + if (!lastHandledEventSequenceNumber.IsActualValue) + { + logger.LastHandledEventIsNotActualValue(); + return; + } + var needCatchupResult = await NeedsCatchup(partition, lastHandledEventSequenceNumber); + await needCatchupResult.Match( + needCatchup => needCatchup + ? StartCatchupJob(partition, lastHandledEventSequenceNumber) + : Task.CompletedTask, + error => + { + switch (error) + { + case GetSequenceNumberError.NotFound: + logger.LastHandledEventForPartitionUnavailable(partition); + return Task.CompletedTask; + default: + return PartitionFailed(partition, lastHandledEventSequenceNumber.Next(), ["Event Sequence storage error caused partition to try recover"], string.Empty); + } + }); + } + + async Task StartCatchupJob(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + var nextEventSequenceNumber = lastHandledEventSequenceNumber.Next(); + logger.StartingCatchUpForPartition(partition, nextEventSequenceNumber); + State.CatchingUpPartitions.Add(partition); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + partition, + nextEventSequenceNumber, + State.EventTypes)); + await WriteStateAsync(); + } + + async Task> NeedsCatchup(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + var nextSequenceNumber = await _eventSequence.GetNextSequenceNumberGreaterOrEqualTo(lastHandledEventSequenceNumber, State.EventTypes, partition); + return nextSequenceNumber.Match>( + number => number != lastHandledEventSequenceNumber, + error => error); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Failing.cs b/Source/Kernel/Grains/Observation/Observer.Failing.cs new file mode 100644 index 000000000..d8e69fbac --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Failing.cs @@ -0,0 +1,111 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Configuration; +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +using Cratis.Chronicle.Grains.Observation.Jobs; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task PartitionFailed( + Key partition, + EventSequenceNumber sequenceNumber, + IEnumerable exceptionMessages, + string exceptionStackTrace) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + _metrics?.PartitionFailed(partition); + logger.PartitionFailed(partition, sequenceNumber); + var failure = failures.State.RegisterAttempt(partition, sequenceNumber, exceptionMessages, exceptionStackTrace); + var config = await configurationProvider.GetFor(_observerKey); + if (config.MaxRetryAttempts == 0 || failure.Attempts.Count() <= config.MaxRetryAttempts) + { + await this.RegisterOrUpdateReminder(partition.ToString(), GetNextRetryDelay(failure, config), TimeSpan.FromHours(48)); + } + else + { + logger.GivingUpOnRecoveringFailedPartition(partition); + } + + await failures.WriteStateAsync(); + } + + /// + public async Task FailedPartitionRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FailingPartitionRecovered(partition); + failures.State.Remove(partition); + await failures.WriteStateAsync(); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } + + /// + public async Task FailedPartitionPartiallyRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FailingPartitionPartiallyRecovered(partition, lastHandledEventSequenceNumber); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + } + + /// + public async Task TryStartRecoverJobForFailedPartition(Key partition) + { + if (!Failures.TryGet(partition, out var failure)) + { + return; + } + + await StartRecoverJobForFailedPartition(failure); + } + + /// + public async Task TryRecoverAllFailedPartitions() + { + foreach (var partition in Failures.Partitions) + { + await StartRecoverJobForFailedPartition(partition); + } + } + + static TimeSpan GetNextRetryDelay(FailedPartition failure, Observers config) + { + var time = TimeSpan.FromSeconds(config.BackoffDelay * Math.Pow(config.ExponentialBackoffDelayFactor, failure.Attempts.Count())); + var maxTime = TimeSpan.FromSeconds(config.MaximumBackoffDelay); + + if (time > maxTime) + { + return maxTime; + } + + if (time == TimeSpan.Zero) + { + return TimeSpan.FromSeconds(config.BackoffDelay); + } + + return time; + } + + async Task StartRecoverJobForFailedPartition(FailedPartition failedPartition) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.TryingToRecoverFailedPartition(failedPartition.Partition); + await RemoveReminder(failedPartition.Partition.ToString()); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + failedPartition.Partition, + failedPartition.LastAttempt.SequenceNumber, + State.EventTypes)); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Handling.cs b/Source/Kernel/Grains/Observation/Observer.Handling.cs new file mode 100644 index 000000000..fbb449e8c --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Handling.cs @@ -0,0 +1,201 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public Task SetHandledStats(EventSequenceNumber lastHandledEventSequenceNumber) + { + State = State with + { + LastHandledEventSequenceNumber = lastHandledEventSequenceNumber + }; + + return WriteStateAsync(); + } + + /// + public async Task Handle(Key partition, IEnumerable events) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + + if (!events.Any()) + { + return; + } + + if (!ShouldHandleEvent(partition)) + { + return; + } + + if (!events.Any(_ => _subscription.EventTypes.Contains(_.Metadata.Type))) + { + State = State with + { + NextEventSequenceNumber = events.Last().Metadata.SequenceNumber.Next() + }; + await WriteStateAsync(); + return; + } + + var failed = false; + var exceptionMessages = Enumerable.Empty(); + var exceptionStackTrace = string.Empty; + var tailEventSequenceNumber = State.NextEventSequenceNumber; + + var eventsToHandle = events.Where(_ => _.Metadata.SequenceNumber >= tailEventSequenceNumber).ToArray(); + var numEventsSuccessfullyHandled = EventCount.Zero; + var stateChanged = false; + if (eventsToHandle.Length != 0) + { + using (new WriteSuspension(this)) + { + try + { + var key = new ObserverSubscriberKey( + _observerKey.ObserverId, + _observerKey.EventStore, + _observerKey.Namespace, + _observerKey.EventSequenceId, + partition, + _subscription.SiloAddress.ToParsableString()); + + var firstEvent = eventsToHandle[0]; + + var subscriber = (GrainFactory.GetGrain(_subscription.SubscriberType, key) as IObserverSubscriber)!; + tailEventSequenceNumber = firstEvent.Metadata.SequenceNumber; + var result = await subscriber.OnNext(partition, eventsToHandle, new(_subscription.Arguments)); + numEventsSuccessfullyHandled = result.HandledAnyEvents + ? eventsToHandle.Count(_ => _.Metadata.SequenceNumber <= result.LastSuccessfulObservation) + : EventCount.Zero; + + if (result.State == ObserverSubscriberState.Failed) + { + failed = true; + exceptionMessages = result.ExceptionMessages; + exceptionStackTrace = result.ExceptionStackTrace; + tailEventSequenceNumber = result.HandledAnyEvents + ? result.LastSuccessfulObservation + : firstEvent.Metadata.SequenceNumber; + } + else if (result.State == ObserverSubscriberState.Disconnected) + { + await Unsubscribe(); + stateChanged = true; + } + + if (numEventsSuccessfullyHandled > 0) + { + stateChanged = true; + State = State with + { + NextEventSequenceNumber = result.LastSuccessfulObservation.Next() + }; + var previousLastHandled = State.LastHandledEventSequenceNumber; + var shouldSetLastHandled = + previousLastHandled == EventSequenceNumber.Unavailable || + previousLastHandled < result.LastSuccessfulObservation; + State = State with + { + LastHandledEventSequenceNumber = shouldSetLastHandled + ? result.LastSuccessfulObservation + : previousLastHandled, + }; + } + } + catch (Exception ex) + { + failed = true; + exceptionMessages = ex.GetAllMessages().ToArray(); + exceptionStackTrace = ex.StackTrace ?? string.Empty; + } + } + + try + { + if (failed) + { + await PartitionFailed(partition, tailEventSequenceNumber, exceptionMessages, exceptionStackTrace); + } + else + { + _metrics?.SuccessfulObservation(); + } + + if (stateChanged) + { + await WriteStateAsync(); + } + } + catch (Exception ex) + { + logger.ObserverFailedForUnknownReasonsAfterHandlingEvents(ex); + } + } + } + + bool ShouldHandleEvent(Key partition) + { + if (!_subscription.IsSubscribed) + { + logger.ObserverIsNotSubscribed(); + return false; + } + + if (Failures.IsFailed(partition)) + { + logger.PartitionIsFailed(partition); + return false; + } + + if (State.RunningState != ObserverRunningState.Active) + { + logger.ObserverIsNotActive(); + return false; + } + + if (_isPreparingCatchup) + { + logger.ObserverIsPreparingCatchup(); + return false; + } + + if (State.ReplayingPartitions.Contains(partition)) + { + logger.PartitionReplayingCannotHandleNewEvents(partition); + return false; + } + + if (State.CatchingUpPartitions.Contains(partition)) + { + logger.PartitionCatchingUpCannotHandleNewEvents(partition); + return false; + } + + return true; + } + + void HandleNewLastHandledEvent(EventSequenceNumber lastHandledEvent) + { + if (!lastHandledEvent.IsActualValue) + { + logger.LastHandledEventIsNotActualValue(); + return; + } + + var newLastHandledEvent = State.LastHandledEventSequenceNumber == EventSequenceNumber.Unavailable || + State.LastHandledEventSequenceNumber < lastHandledEvent ? lastHandledEvent : State.LastHandledEventSequenceNumber; + var nextEventSequenceNumber = State.NextEventSequenceNumber <= lastHandledEvent ? lastHandledEvent.Next() : State.NextEventSequenceNumber; + State = State with + { + LastHandledEventSequenceNumber = newLastHandledEvent, + NextEventSequenceNumber = nextEventSequenceNumber + }; + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Replay.cs b/Source/Kernel/Grains/Observation/Observer.Replay.cs new file mode 100644 index 000000000..6d3be295d --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Replay.cs @@ -0,0 +1,64 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +using Cratis.Chronicle.Grains.Observation.Jobs; +using Cratis.Chronicle.Grains.Observation.States; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task Replay() + { + if (State.RunningState == ObserverRunningState.Active) + { + await TransitionTo(); + } + } + + /// + public Task ReplayPartition(Key partition) => ReplayPartitionTo(partition, EventSequenceNumber.Max); + + /// + public async Task ReplayPartitionTo(Key partition, EventSequenceNumber sequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.AttemptReplayPartition(partition, sequenceNumber); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + partition, + EventSequenceNumber.First, + sequenceNumber, + State.EventTypes)); + + State.ReplayingPartitions.Add(partition); + await WriteStateAsync(); + } + + /// + public async Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await TransitionTo(); + } + + /// + public async Task PartitionReplayed(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FinishedReplayForPartition(partition); + State.ReplayingPartitions.Remove(partition); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.cs b/Source/Kernel/Grains/Observation/Observer.cs index 82ec2f58e..684915c22 100644 --- a/Source/Kernel/Grains/Observation/Observer.cs +++ b/Source/Kernel/Grains/Observation/Observer.cs @@ -35,7 +35,7 @@ namespace Cratis.Chronicle.Grains.Observation; /// for the observer. /// for creating loggers. [StorageProvider(ProviderName = WellKnownGrainStorageProviders.Observers)] -public class Observer( +public partial class Observer( [PersistentState(nameof(FailedPartition), WellKnownGrainStorageProviders.FailedPartitions)] IPersistentState failures, IObserverServiceClient replayStateServiceClient, @@ -95,17 +95,6 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella public Task GetState() => Task.FromResult(State); #pragma warning restore CA1721 // Property names should not match get methods - /// - public Task SetHandledStats(EventSequenceNumber lastHandledEventSequenceNumber) - { - State = State with - { - LastHandledEventSequenceNumber = lastHandledEventSequenceNumber - }; - - return WriteStateAsync(); - } - /// public Task GetSubscription() => Task.FromResult(_subscription); @@ -182,308 +171,6 @@ public async Task Unsubscribe() await TransitionTo(); } - /// - public async Task Replay() - { - if (State.RunningState == ObserverRunningState.Active) - { - await TransitionTo(); - } - } - - /// - public Task ReplayPartition(Key partition) => ReplayPartitionTo(partition, EventSequenceNumber.Max); - - /// - public async Task ReplayPartitionTo(Key partition, EventSequenceNumber sequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.AttemptReplayPartition(partition, sequenceNumber); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - partition, - EventSequenceNumber.First, - sequenceNumber, - State.EventTypes)); - - State.ReplayingPartitions.Add(partition); - await WriteStateAsync(); - } - - /// - public async Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await TransitionTo(); - } - - /// - public async Task PartitionReplayed(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FinishedReplayForPartition(partition); - State.ReplayingPartitions.Remove(partition); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task PartitionFailed( - Key partition, - EventSequenceNumber sequenceNumber, - IEnumerable exceptionMessages, - string exceptionStackTrace) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - _metrics?.PartitionFailed(partition); - logger.PartitionFailed(partition, sequenceNumber); - var failure = failures.State.RegisterAttempt(partition, sequenceNumber, exceptionMessages, exceptionStackTrace); - var config = await configurationProvider.GetFor(_observerKey); - if (config.MaxRetryAttempts == 0 || failure.Attempts.Count() <= config.MaxRetryAttempts) - { - await this.RegisterOrUpdateReminder(partition.ToString(), GetNextRetryDelay(failure, config), TimeSpan.FromHours(48)); - } - else - { - logger.GivingUpOnRecoveringFailedPartition(partition); - } - - await failures.WriteStateAsync(); - } - - /// - public async Task FailedPartitionRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FailingPartitionRecovered(partition); - failures.State.Remove(partition); - await failures.WriteStateAsync(); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task FailedPartitionPartiallyRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FailingPartitionPartiallyRecovered(partition, lastHandledEventSequenceNumber); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - } - - /// - public async Task CatchUp() - { - _isPreparingCatchup = true; - using var scope = logger.BeginObserverScope(State.Id, _observerKey); - - var subscription = await GetSubscription(); - - var jobs = await _jobsManager.GetJobsOfType(); - var jobsForThisObserver = jobs.Where(IsJobForThisObserver); - if (jobs.Any(_ => _.Status == JobStatus.Running)) - { - logger.FinishingExistingCatchUpJob(); - return; - } - - var pausedJob = jobs.FirstOrDefault(_ => _.Status == JobStatus.Paused); - - if (pausedJob is not null) - { - logger.ResumingCatchUpJob(); - await _jobsManager.Resume(pausedJob.Id); - } - else - { - logger.StartCatchUpJob(State.NextEventSequenceNumber); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - subscription, - State.NextEventSequenceNumber, - State.EventTypes)); - } - } - - /// - public async Task RegisterCatchingUpPartitions(IEnumerable partitions) - { - using var scope = logger.BeginObserverScope(State.Id, _observerKey); - logger.RegisteringCatchingUpPartitions(); - foreach (var partition in partitions) - { - State.CatchingUpPartitions.Add(partition); - } - - await WriteStateAsync(); - - _isPreparingCatchup = false; - } - - /// - public async Task CaughtUp(EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await TransitionTo(); - } - - /// - public async Task PartitionCaughtUp(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.PartitionCaughtUp(partition, lastHandledEventSequenceNumber); - State.CatchingUpPartitions.Remove(partition); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task TryStartRecoverJobForFailedPartition(Key partition) - { - if (!Failures.TryGet(partition, out var failure)) - { - return; - } - - await StartRecoverJobForFailedPartition(failure); - } - - /// - public async Task TryRecoverAllFailedPartitions() - { - foreach (var partition in Failures.Partitions) - { - await StartRecoverJobForFailedPartition(partition); - } - } - - /// - public async Task Handle(Key partition, IEnumerable events) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - - if (!events.Any()) - { - return; - } - - if (!ShouldHandleEvent(partition)) - { - return; - } - - if (!events.Any(_ => _subscription.EventTypes.Contains(_.Metadata.Type))) - { - State = State with { NextEventSequenceNumber = events.Last().Metadata.SequenceNumber.Next() }; - await WriteStateAsync(); - return; - } - - var failed = false; - var exceptionMessages = Enumerable.Empty(); - var exceptionStackTrace = string.Empty; - var tailEventSequenceNumber = State.NextEventSequenceNumber; - - var eventsToHandle = events.Where(_ => _.Metadata.SequenceNumber >= tailEventSequenceNumber).ToArray(); - var numEventsSuccessfullyHandled = EventCount.Zero; - var stateChanged = false; - if (eventsToHandle.Length != 0) - { - using (new WriteSuspension(this)) - { - try - { - var key = new ObserverSubscriberKey( - _observerKey.ObserverId, - _observerKey.EventStore, - _observerKey.Namespace, - _observerKey.EventSequenceId, - partition, - _subscription.SiloAddress.ToParsableString()); - - var firstEvent = eventsToHandle[0]; - - var subscriber = (GrainFactory.GetGrain(_subscription.SubscriberType, key) as IObserverSubscriber)!; - tailEventSequenceNumber = firstEvent.Metadata.SequenceNumber; - var result = await subscriber.OnNext(partition, eventsToHandle, new(_subscription.Arguments)); - numEventsSuccessfullyHandled = result.HandledAnyEvents - ? eventsToHandle.Count(_ => _.Metadata.SequenceNumber <= result.LastSuccessfulObservation) - : EventCount.Zero; - - if (result.State == ObserverSubscriberState.Failed) - { - failed = true; - exceptionMessages = result.ExceptionMessages; - exceptionStackTrace = result.ExceptionStackTrace; - tailEventSequenceNumber = result.HandledAnyEvents - ? result.LastSuccessfulObservation - : firstEvent.Metadata.SequenceNumber; - } - else if (result.State == ObserverSubscriberState.Disconnected) - { - await Unsubscribe(); - stateChanged = true; - } - - if (numEventsSuccessfullyHandled > 0) - { - stateChanged = true; - State = State with { NextEventSequenceNumber = result.LastSuccessfulObservation.Next() }; - var previousLastHandled = State.LastHandledEventSequenceNumber; - var shouldSetLastHandled = - previousLastHandled == EventSequenceNumber.Unavailable || - previousLastHandled < result.LastSuccessfulObservation; - State = State with - { - LastHandledEventSequenceNumber = shouldSetLastHandled - ? result.LastSuccessfulObservation - : previousLastHandled, - }; - } - } - catch (Exception ex) - { - failed = true; - exceptionMessages = ex.GetAllMessages().ToArray(); - exceptionStackTrace = ex.StackTrace ?? string.Empty; - } - } - - try - { - if (failed) - { - await PartitionFailed(partition, tailEventSequenceNumber, exceptionMessages, exceptionStackTrace); - } - else - { - _metrics?.SuccessfulObservation(); - } - - if (stateChanged) - { - await WriteStateAsync(); - } - } - catch (Exception ex) - { - logger.ObserverFailedForUnknownReasonsAfterHandlingEvents(ex); - } - } - } - /// public async Task ReceiveReminder(string reminderName, TickStatus status) { @@ -527,65 +214,6 @@ protected override async Task WriteStateAsync() await base.WriteStateAsync(); } - static TimeSpan GetNextRetryDelay(FailedPartition failure, Observers config) - { - var time = TimeSpan.FromSeconds(config.BackoffDelay * Math.Pow(config.ExponentialBackoffDelayFactor, failure.Attempts.Count())); - var maxTime = TimeSpan.FromSeconds(config.MaximumBackoffDelay); - - if (time > maxTime) - { - return maxTime; - } - - if (time == TimeSpan.Zero) - { - return TimeSpan.FromSeconds(config.BackoffDelay); - } - - return time; - } - - bool ShouldHandleEvent(Key partition) - { - if (!_subscription.IsSubscribed) - { - logger.ObserverIsNotSubscribed(); - return false; - } - - if (Failures.IsFailed(partition)) - { - logger.PartitionIsFailed(partition); - return false; - } - - if (State.RunningState != ObserverRunningState.Active) - { - logger.ObserverIsNotActive(); - return false; - } - - if (_isPreparingCatchup) - { - logger.ObserverIsPreparingCatchup(); - return false; - } - - if (State.ReplayingPartitions.Contains(partition)) - { - logger.PartitionReplayingCannotHandleNewEvents(partition); - return false; - } - - if (State.CatchingUpPartitions.Contains(partition)) - { - logger.PartitionCatchingUpCannotHandleNewEvents(partition); - return false; - } - - return true; - } - async Task ResumeJobs() { var unfilteredJobs = await _jobsManager.GetAllJobs(); @@ -598,39 +226,6 @@ _.Request is IObserverJobRequest && } } - void HandleNewLastHandledEvent(EventSequenceNumber lastHandledEvent) - { - if (!lastHandledEvent.IsActualValue) - { - logger.LastHandledEventIsNotActualValue(); - return; - } - - var newLastHandledEvent = State.LastHandledEventSequenceNumber == EventSequenceNumber.Unavailable || - State.LastHandledEventSequenceNumber < lastHandledEvent ? lastHandledEvent : State.LastHandledEventSequenceNumber; - var nextEventSequenceNumber = State.NextEventSequenceNumber <= lastHandledEvent ? lastHandledEvent.Next() : State.NextEventSequenceNumber; - State = State with - { - LastHandledEventSequenceNumber = newLastHandledEvent, - NextEventSequenceNumber = nextEventSequenceNumber - }; - } - - async Task StartRecoverJobForFailedPartition(FailedPartition failedPartition) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.TryingToRecoverFailedPartition(failedPartition.Partition); - await RemoveReminder(failedPartition.Partition.ToString()); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - failedPartition.Partition, - failedPartition.LastAttempt.SequenceNumber, - State.EventTypes)); - } - async Task RemoveReminder(Key partition) { var reminder = await this.GetReminder(partition.ToString()); @@ -640,63 +235,6 @@ async Task RemoveReminder(Key partition) } } - async Task StartCatchupJobIfNeeded(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - if (failures.State.IsFailed(partition)) - { - logger.PartitionToCatchUpIsFailing(partition); - return; - } - if (!lastHandledEventSequenceNumber.IsActualValue) - { - logger.LastHandledEventIsNotActualValue(); - return; - } - var needCatchupResult = await NeedsCatchup(partition, lastHandledEventSequenceNumber); - await needCatchupResult.Match( - needCatchup => needCatchup - ? StartCatchupJob(partition, lastHandledEventSequenceNumber) - : Task.CompletedTask, - error => - { - switch (error) - { - case GetSequenceNumberError.NotFound: - logger.LastHandledEventForPartitionUnavailable(partition); - return Task.CompletedTask; - default: - return PartitionFailed(partition, lastHandledEventSequenceNumber.Next(), ["Event Sequence storage error caused partition to try recover"], string.Empty); - } - }); - } - - async Task StartCatchupJob(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - var nextEventSequenceNumber = lastHandledEventSequenceNumber.Next(); - logger.StartingCatchUpForPartition(partition, nextEventSequenceNumber); - State.CatchingUpPartitions.Add(partition); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - partition, - nextEventSequenceNumber, - State.EventTypes)); - await WriteStateAsync(); - } - - async Task> NeedsCatchup(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - var nextSequenceNumber = await _eventSequence.GetNextSequenceNumberGreaterOrEqualTo(lastHandledEventSequenceNumber, State.EventTypes, partition); - return nextSequenceNumber.Match>( - number => number != lastHandledEventSequenceNumber, - error => error); - } - - bool IsJobForThisObserver(JobState jobState) => - ((ReplayObserverRequest)jobState.Request).ObserverKey == _observerKey; - class WriteSuspension : IDisposable { readonly Observer _observer; From 09b696f5cdb38792338d5be5173b797b713ffd4b Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 11 Feb 2025 13:41:35 +0100 Subject: [PATCH 2/9] Make JobsManager Reentrant because it should be safe --- Source/Kernel/Grains/Jobs/JobsManager.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Source/Kernel/Grains/Jobs/JobsManager.cs b/Source/Kernel/Grains/Jobs/JobsManager.cs index 401a7a33a..9f3b06bc0 100644 --- a/Source/Kernel/Grains/Jobs/JobsManager.cs +++ b/Source/Kernel/Grains/Jobs/JobsManager.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using OneOf; using OneOf.Types; +using Orleans.Concurrency; namespace Cratis.Chronicle.Grains.Jobs; @@ -21,6 +22,7 @@ namespace Cratis.Chronicle.Grains.Jobs; /// for working with underlying storage. /// that knows about job type associations. /// Logger for logging. +[Reentrant] public class JobsManager( IStorage storage, IJobTypes jobTypes, From cf1193881543e0c9225ca27f8703230f7dfd9e44 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 12:44:46 +0100 Subject: [PATCH 3/9] Add StartJobError type and use it in IJob.Start --- Source/Kernel/Grains.Interfaces/Jobs/IJob.cs | 2 +- .../Grains.Interfaces/Jobs/StartJobError.cs | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs diff --git a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs index 381ab6ba2..34d38d5f0 100644 --- a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs +++ b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs @@ -114,5 +114,5 @@ public interface IJob : IJob /// /// The request object for the job. /// Awaitable task. - Task> Start(TRequest request); + Task> Start(TRequest request); } \ No newline at end of file diff --git a/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs b/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs new file mode 100644 index 000000000..1dd9a63e1 --- /dev/null +++ b/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs @@ -0,0 +1,35 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// The type of error that occurred while starting . +/// +public enum StartJobError +{ + /// + /// Unknown error occurred. + /// + Unknown = 0, + + /// + /// Some job steps failed to prepare. + /// + CouldNotPrepareJobSteps = 1, + + /// + /// Some jobs failed to start. + /// + FailedStartingSomeJobSteps = 2, + + /// + /// None of the jobs was started. + /// + AllJobStepsFailedStarting = 3, + + /// + /// There were no prepared job steps. + /// + NoJobStepsToStart = 4, +} From d91bb61189ff5a83f87a3b7c08644bf2703576e1 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 12:45:36 +0100 Subject: [PATCH 4/9] Add Job logging for failed startign job step --- Source/Kernel/Grains/Jobs/JobLogging.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Source/Kernel/Grains/Jobs/JobLogging.cs b/Source/Kernel/Grains/Jobs/JobLogging.cs index 2e68a0de9..4bdf6fc39 100644 --- a/Source/Kernel/Grains/Jobs/JobLogging.cs +++ b/Source/Kernel/Grains/Jobs/JobLogging.cs @@ -99,6 +99,9 @@ internal static partial class JobLogMessages [LoggerMessage(LogLevel.Warning, "Job failed starting job step {JobStepId}. Error: {Error}")] internal static partial void FailedStartingJobStep(this ILogger logger, JobStepId jobStepId, JobStepPrepareStartError error); + [LoggerMessage(LogLevel.Warning, "Job failed starting job step {JobStepId}")] + internal static partial void FailedStartingJobStep(this ILogger logger, Exception ex, JobStepId jobStepId); + [LoggerMessage(LogLevel.Debug, "Not all steps was completed successfully")] internal static partial void AllStepsNotCompletedSuccessfully(this ILogger logger); } From 7ffcf219329bb6d5d39d3d2702ca0b1069fba99a Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 13:41:14 +0100 Subject: [PATCH 5/9] Code cleanup --- .../Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs b/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs index f53a12166..f074df061 100644 --- a/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs +++ b/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs @@ -42,7 +42,7 @@ public async ValueTask MoveNextAsync() return false; } - _queue = new Queue(cursor.Current.Select(_ => new Key(_.Value, ArrayIndexers.NoIndexers))); + _queue = new(cursor.Current.Select(_ => new Key(_.Value, ArrayIndexers.NoIndexers))); } if (_queue.Count == 0) From 165363662d55d753df64d67bc776b4d8f7f530a6 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 13:43:54 +0100 Subject: [PATCH 6/9] Write EventTypes to ObserverState when Subscribing instead of OnLeave of state machine state. We were just lucky that when starting jobs that we had stored the correct state on the Observer most of the time. --- Source/Kernel/Grains/Observation/Observer.cs | 10 ++++------ Source/Kernel/Grains/Observation/States/Routing.cs | 9 --------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/Source/Kernel/Grains/Observation/Observer.cs b/Source/Kernel/Grains/Observation/Observer.cs index 684915c22..48066ef36 100644 --- a/Source/Kernel/Grains/Observation/Observer.cs +++ b/Source/Kernel/Grains/Observation/Observer.cs @@ -116,7 +116,7 @@ public async Task Subscribe( logger.Subscribing(); - State = State with { Type = type }; + State = State with { Type = type, EventTypes = eventTypes }; _subscription = new ObserverSubscription( _observerId, @@ -125,7 +125,7 @@ public async Task Subscribe( typeof(TObserverSubscriber), siloAddress, subscriberArgs); - + await WriteStateAsync(); await ResumeJobs(); await TryRecoverAllFailedPartitions(); await TransitionTo(); @@ -216,11 +216,9 @@ protected override async Task WriteStateAsync() async Task ResumeJobs() { + // TODO: Do this in a Task.WhenAll because JobsManager is reentrant now var unfilteredJobs = await _jobsManager.GetAllJobs(); - var jobs = unfilteredJobs.Where(_ => - _.Request is IObserverJobRequest && - _.IsResumable).ToArray(); - foreach (var job in jobs) + foreach (var job in unfilteredJobs.Where(job => job is { Request: IObserverJobRequest, IsResumable: true }).ToArray()) { await _jobsManager.Resume(job.Id); } diff --git a/Source/Kernel/Grains/Observation/States/Routing.cs b/Source/Kernel/Grains/Observation/States/Routing.cs index 43789b1ca..389bc6cc9 100644 --- a/Source/Kernel/Grains/Observation/States/Routing.cs +++ b/Source/Kernel/Grains/Observation/States/Routing.cs @@ -56,15 +56,6 @@ public override async Task OnEnter(ObserverState state) return await EvaluateState(state); } - /// - public override Task OnLeave(ObserverState state) - { - return Task.FromResult(state with - { - EventTypes = _subscription.IsSubscribed ? _subscription.EventTypes : state.EventTypes - }); - } - async Task EvaluateState(ObserverState state) { if (!_subscription.IsSubscribed) From c809bb9aa881272c7abaa336f832189f7bd159d8 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 13:50:31 +0100 Subject: [PATCH 7/9] Wait some time for the observers to reach the desired handled event in integration test because it can occationally fail because GetState is interleaved method --- .../and_waiting_for_observer_to_be_active.cs | 2 ++ .../and_waiting_for_observer_to_be_active.cs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs b/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs index 81e7d4fe9..f3672b90c 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs @@ -8,6 +8,7 @@ using Cratis.Chronicle.Storage.MongoDB; using Cratis.Chronicle.Storage.MongoDB.Observation; using Cratis.Chronicle.Storage.Observation; +using Humanizer; using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_appending_event.and_waiting_for_observer_to_be_active.context; using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState; @@ -49,6 +50,7 @@ async Task Because() await EventStore.EventLog.Append(EventSourceId, Event); await Tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); WaitingForObserverStateError = await Catch.Exception(async () => await ReactorObserver.WaitForState(ObserverRunningState.Active, TimeSpan.FromSeconds(5))); + await ReactorObserver.WaitTillReachesEventSequenceNumber(EventSequenceNumber.First, 5.Seconds()); ReactorObserverState = await ReactorObserver.GetState(); FailedPartitions = await EventStore.Connection.Services.FailedPartitions.GetFailedPartitions(new() diff --git a/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs b/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs index b8b362af8..fc4f1073a 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs @@ -8,6 +8,7 @@ using Cratis.Chronicle.Storage.MongoDB; using Cratis.Chronicle.Storage.MongoDB.Observation; using Cratis.Chronicle.Storage.Observation; +using Humanizer; using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reducers.when_appending_event.and_waiting_for_observer_to_be_active.context; using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState; @@ -49,6 +50,7 @@ async Task Because() await EventStore.EventLog.Append(EventSourceId, Event); await Tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); WaitingForObserverStateError = await Catch.Exception(async () => await ReducerObserver.WaitForState(ObserverRunningState.Active, TimeSpan.FromSeconds(5))); + await ReducerObserver.WaitTillReachesEventSequenceNumber(EventSequenceNumber.First, 5.Seconds()); ReducerObserverState = await ReducerObserver.GetState(); FailedPartitions = await EventStore.Connection.Services.FailedPartitions.GetFailedPartitions(new() From 18ce3315eddc844274bc7b901c9d8c31b65346a0 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 13:53:22 +0100 Subject: [PATCH 8/9] Separate job step related methods in Job in its own partial class. Also rework Start so that it is waits for the whole process of starting all jobsteps to be finished before returning. This makes the code more simple and safe because we don't have to think about being in multiple grain contexts in a single logical method execution and we don't need to use task completion sources and task continue with manually anymore. We can do this now because I made JobsManager reentrant so that we can perform multiple actions on separate jobs at the same time, which in my opinion makes more sense --- Source/Kernel/Grains/Jobs/Job.Steps.cs | 300 +++++++++++++++++++++++++ Source/Kernel/Grains/Jobs/Job.cs | 284 ++--------------------- 2 files changed, 314 insertions(+), 270 deletions(-) create mode 100644 Source/Kernel/Grains/Jobs/Job.Steps.cs diff --git a/Source/Kernel/Grains/Jobs/Job.Steps.cs b/Source/Kernel/Grains/Jobs/Job.Steps.cs new file mode 100644 index 000000000..d5fca822e --- /dev/null +++ b/Source/Kernel/Grains/Jobs/Job.Steps.cs @@ -0,0 +1,300 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Collections.Immutable; +using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Reflection; +namespace Cratis.Chronicle.Grains.Jobs; + +public abstract partial class Job +{ + /// + public async Task> OnStepSucceeded(JobStepId stepId, JobStepResult jobStepResult) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + try + { + _logger.StepSuccessfullyCompleted(stepId); + State.Progress.SuccessfulSteps++; + + if ((await WriteState()).TryGetException(out var writeStateError)) + { + _logger.FailedUpdatingSuccessfulSteps(writeStateError, State.Progress.SuccessfulSteps); + return JobError.PersistStateError; + } + + var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); + return handleCompletedStepResult.Match( + _ => Result.Success(), + error => + { + _logger.FailedHandlingCompletedJobStep(stepId, error); + return Result.Failed(error); + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + } + + /// + public Task> OnStepStopped(JobStepId stepId, JobStepResult jobStepResult) => Task.FromResult(Result.Success()); + + /// + public async Task> OnStepFailed(JobStepId stepId, JobStepResult jobStepResult) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + try + { + _logger.StepFailed(stepId); + State.Progress.FailedSteps++; + if ((await WriteState()).TryGetException(out var writeStateError)) + { + _logger.FailedUpdatingFailedSteps(writeStateError, State.Progress.SuccessfulSteps); + return JobError.PersistStateError; + } + var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); + return handleCompletedStepResult.Match( + _ => Result.Success(), + error => + { + _logger.FailedHandlingCompletedJobStep(stepId, error); + return Result.Failed(error); + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + } + + /// + /// Called when a step has completed. + /// + /// for the completed step. + /// for the completed step. + /// Awaitable task. + protected virtual Task OnStepCompleted(JobStepId jobStepId, JobStepResult result) => Task.CompletedTask; + + /// + /// Create a new . + /// + /// The request associated with the step. + /// The type of job step to create. + /// A new instance of the job step. + protected JobStepDetails CreateStep(object request) + where TJobStep : IJobStep + { + var jobStepId = JobStepId.New(); + var jobId = this.GetPrimaryKey(out var keyExtension); + var jobKey = (JobKey)keyExtension!; + var jobStepType = typeof(TJobStep) + .AllBaseAndImplementingTypes() + .First( + _ => _.IsGenericType && _.GetGenericTypeDefinition() == typeof(IJobStep<,>)); + var resultType = jobStepType.GetGenericArguments()[1]; + return new( + typeof(TJobStep), + jobStepId, + new(jobId, jobKey.EventStore, jobKey.Namespace), + request, + resultType); + } + + /// + /// Called before preparing steps. + /// + /// The request associated with the job. + /// Awaitable task. + protected virtual Task OnBeforePrepareSteps(TRequest request) => Task.CompletedTask; + + /// + /// Start the job. + /// + /// The request associated with the job. + /// Collection of . + protected abstract Task> PrepareSteps(TRequest request); + + async Task> HandleJobStepCompleted(JobStepId stepId, JobStepResult result) + { + try + { + await OnStepCompleted(stepId, result); + var handleCompletionResult = await HandleCompletion(); + if (handleCompletionResult.TryGetError(out var handleCompletionError)) + { + return handleCompletionError; + } + var needsToWriteState = handleCompletionResult.AsT0 switch + { + HandleCompletionSuccess.NotClearedState => true, + _ => false + }; + if (!needsToWriteState) + { + return Result.Success(); + } + var writeStateResult = await WriteState(); + return writeStateResult.Match( + _ => Result.Success(), + ex => + { + _logger.FailedUpdatingStateAfterHandlingJobStepCompletion(ex, stepId); + return JobError.PersistStateError; + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + finally + { + await Unsubscribe(_jobStepGrains[stepId].Grain.AsReference()); + _jobStepGrains.Remove(stepId); + } + } + + Dictionary CreateGrainsFromJobSteps(IImmutableList jobSteps) => + jobSteps.ToDictionary( + details => details.Id, + details => new JobStepGrainAndRequest( + (GrainFactory.GetGrain(details.Type, details.Id, keyExtension: details.Key) as IJobStep)!, + details.Request, + details.ResultType)); + + async Task> PrepareAndStartRunningAllSteps(TRequest request) + { + try + { + var grainId = this.GetGrainId(); + await OnBeforePrepareSteps(request); + var steps = await PrepareSteps(request); + await SetTotalSteps(steps.Count); + if (steps.Count == 0) + { + _logger.NoJobStepsToStart(); + + // TODO: Not sure if Complete should be called or HandleCompletion + var onCompletedResult = await Complete(); + if (onCompletedResult.TryGetError(out var onCompletedError)) + { + _logger.FailedOnCompletedWhileNoJobSteps(onCompletedError); + _ = await WriteStatusChanged(JobStatus.CompletedWithFailures); + } + else + { + await WriteStatusChanged(JobStatus.CompletedSuccessfully); + if (!KeepAfterCompleted) + { + await ClearStateAsync(); + } + } + return StartJobError.NoJobStepsToStart; + } + + _logger.PreparingJobSteps(steps.Count); + _jobStepGrains = CreateGrainsFromJobSteps(steps); + return await PrepareAndStartAllJobSteps(grainId); + } + catch (Exception ex) + { + _logger.ErrorPreparingJobSteps(ex); + _ = await WriteStatusChanged(JobStatus.Failed, ex); + return StartJobError.Unknown; + } + } + + async Task> PrepareAndStartAllJobSteps(GrainId grainId) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + _logger.PrepareJobStepsForRunning(); + _ = await WriteStatusChanged(JobStatus.PreparingStepsForRunning); + + var preparedAllJobSteps = await TryPrepareAllJobSteps(); + if (!preparedAllJobSteps) + { + _ = await WriteStatusChanged(JobStatus.Failed); + return StartJobError.CouldNotPrepareJobSteps; + } + + var startJobStepsResult = await StartAndSubscribeToAllJobSteps(grainId); + if (startJobStepsResult.TryGetError(out var startJobStepsError)) + { + if (startJobStepsError == StartJobError.AllJobStepsFailedStarting) + { + _ = await HandleCompletion(); + _ = await WriteState(); + } + return startJobStepsError; + } + + _ = await WriteStatusChanged(JobStatus.Running); + return Result.Success(); + } + + async Task TryPrepareAllJobSteps() + { + var prepareAllSteps = _jobStepGrains.Select(async idAndGrain => + { + var (id, jobStep) = idAndGrain; + try + { + if (!(await jobStep.Grain.Prepare(jobStep.Request)).TryGetError(out var prepareError)) + { + return (JobStepId: id, Result: Result.Success()); + } + _logger.FailedPreparingJobStep(id, prepareError); + return (JobStepId: id, Result: prepareError); + } + catch (Exception ex) + { + _logger.ErrorPreparingJobStep(ex, id); + return (JobStepId: id, Result: Result.Failed(JobStepPrepareStartError.Unknown)); + } + }); + var results = await Task.WhenAll(prepareAllSteps); + return results.All(_ => _.Result.IsSuccess); + } + + async Task> StartAndSubscribeToAllJobSteps(GrainId grainId) + { + var prepareAllSteps = _jobStepGrains.Select(async idAndGrain => + { + var (id, jobStep) = idAndGrain; + try + { + if (!(await jobStep.Grain.Start(grainId, jobStep.Request)).TryGetError(out var startError)) + { + return (JobStepId: id, Result: Result.Success()); + } + _logger.FailedStartingJobStep(id, startError); + return (JobStepId: id, Result: startError); + } + catch (Exception ex) + { + _logger.FailedStartingJobStep(ex, id); + return (JobStepId: id, Result: Result.Failed(JobStepPrepareStartError.Unknown)); + } + }); + var results = await Task.WhenAll(prepareAllSteps); + var numFailedJobSteps = results.Count(finishedTask => !finishedTask.Result.IsSuccess); + foreach (var idAndJobStep in results.Where(_ => _.Result.IsSuccess)) + { + var jobStepGrain = _jobStepGrains[idAndJobStep.JobStepId].Grain; + await Subscribe(jobStepGrain.AsReference()); + } + if (numFailedJobSteps == 0) + { + return Result.Success(); + } + State.Progress.FailedSteps += numFailedJobSteps; + return numFailedJobSteps == results.Length + ? StartJobError.AllJobStepsFailedStarting + : StartJobError.FailedStartingSomeJobSteps; + } +} diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index c7b520e73..d66d9f3e3 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -1,12 +1,10 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Immutable; using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Storage; using Cratis.Chronicle.Storage.Jobs; -using Cratis.Reflection; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -21,7 +19,7 @@ namespace Cratis.Chronicle.Grains.Jobs; /// Type of request object that gets passed to job. /// Type of state for the job. [StorageProvider(ProviderName = WellKnownGrainStorageProviders.Jobs)] -public abstract class Job : Grain, IJob +public abstract partial class Job : Grain, IJob where TRequest : class, IJobRequest where TJobState : JobState { @@ -106,60 +104,18 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) } /// - public async Task> Start(TRequest request) + public async Task> Start(TRequest request) { using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.Starting(); - _isRunning = true; - State.Created = DateTimeOffset.UtcNow; - State.Request = request!; - State.Details = GetJobDetails(); - await WriteStatusChanged(JobStatus.PreparingSteps); - var grainId = this.GetGrainId(); - var tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); - - await PrepareAllSteps(request, tcs); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - tcs.Task.ContinueWith( - async getPreparedJobSteps => - { - try - { - var jobSteps = await getPreparedJobSteps; - if (jobSteps.Count == 0) - { - _logger.NoJobStepsToStart(); - var onCompletedResult = await Complete(); - if (onCompletedResult.TryGetError(out var onCompletedError)) - { - _logger.FailedOnCompletedWhileNoJobSteps(onCompletedError); - } - StatusChanged(JobStatus.CompletedSuccessfully); // This is effectively just a noop since state is cleared after this line - await ClearStateAsync(); - return; - } - - _logger.PreparingJobSteps(jobSteps.Count); - _jobStepGrains = CreateGrainsFromJobSteps(jobSteps); - _ = await WriteStatusChanged(JobStatus.PreparingStepsForRunning); - PrepareAndStartAllJobSteps(grainId); - } - catch (Exception ex) - { - _logger.ErrorPreparingJobSteps(ex); - } - }, - TaskScheduler.Current); - return Result.Success(); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } + _logger.Starting(); + _isRunning = true; + + State.Created = DateTimeOffset.UtcNow; + State.Request = request!; + State.Details = GetJobDetails(); + + _ = await WriteStatusChanged(JobStatus.PreparingSteps); + return await PrepareAndStartRunningAllSteps(request); } /// @@ -179,6 +135,7 @@ public async Task> Resume() } _logger.Resuming(); + _isRunning = true; var getStepsResult = await Storage.JobSteps.GetForJob(JobId, JobStepStatus.Scheduled, JobStepStatus.Running, JobStepStatus.Paused); return await getStepsResult.Match( @@ -278,7 +235,7 @@ public async Task> Stop() await _observers!.Notify(o => o.OnJobStopped()); var stepStorage = ServiceProvider.GetRequiredService(); - + // TODO: We probably don't want to do this. And we need to nail down the semantics of Stop vs Pause and how it works with Resume var removeAllStepsResult = await stepStorage.RemoveAllForJob(JobId); return await removeAllStepsResult.Match( async _ => @@ -304,69 +261,6 @@ public async Task> Stop() /// public virtual Task OnCompleted() => Task.FromResult(Result.Success()); - /// - public async Task> OnStepSucceeded(JobStepId stepId, JobStepResult jobStepResult) - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.StepSuccessfullyCompleted(stepId); - State.Progress.SuccessfulSteps++; - - if ((await WriteState()).TryGetException(out var writeStateError)) - { - _logger.FailedUpdatingSuccessfulSteps(writeStateError, State.Progress.SuccessfulSteps); - return JobError.PersistStateError; - } - - var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); - return handleCompletedStepResult.Match( - _ => Result.Success(), - error => - { - _logger.FailedHandlingCompletedJobStep(stepId, error); - return Result.Failed(error); - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - - /// - public Task> OnStepStopped(JobStepId stepId, JobStepResult jobStepResult) => Task.FromResult(Result.Success()); - - /// - public async Task> OnStepFailed(JobStepId stepId, JobStepResult jobStepResult) - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.StepFailed(stepId); - State.Progress.FailedSteps++; - if ((await WriteState()).TryGetException(out var writeStateError)) - { - _logger.FailedUpdatingFailedSteps(writeStateError, State.Progress.SuccessfulSteps); - return JobError.PersistStateError; - } - var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); - return handleCompletedStepResult.Match( - _ => Result.Success(), - error => - { - _logger.FailedHandlingCompletedJobStep(stepId, error); - return Result.Failed(error); - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - /// public async Task> SetTotalSteps(int totalSteps) { @@ -450,53 +344,6 @@ protected async Task WriteState() protected bool IsInStoppedOrCompletedState() => State.Status is JobStatus.Stopped or JobStatus.CompletedSuccessfully or JobStatus.CompletedWithFailures; - /// - /// Called when a step has completed. - /// - /// for the completed step. - /// for the completed step. - /// Awaitable task. - protected virtual Task OnStepCompleted(JobStepId jobStepId, JobStepResult result) => Task.CompletedTask; - - /// - /// Create a new . - /// - /// The request associated with the step. - /// The type of job step to create. - /// A new instance of the job step. - protected JobStepDetails CreateStep(object request) - where TJobStep : IJobStep - { - var jobStepId = JobStepId.New(); - var jobId = this.GetPrimaryKey(out var keyExtension); - var jobKey = (JobKey)keyExtension!; - var jobStepType = typeof(TJobStep) - .AllBaseAndImplementingTypes() - .First( - _ => _.IsGenericType && _.GetGenericTypeDefinition() == typeof(IJobStep<,>)); - var resultType = jobStepType.GetGenericArguments()[1]; - return new( - typeof(TJobStep), - jobStepId, - new(jobId, jobKey.EventStore, jobKey.Namespace), - request, - resultType); - } - - /// - /// Called before preparing steps. - /// - /// The request associated with the job. - /// Awaitable task. - protected virtual Task OnBeforePrepareSteps(TRequest request) => Task.CompletedTask; - - /// - /// Start the job. - /// - /// The request associated with the job. - /// Collection of . - protected abstract Task> PrepareSteps(TRequest request); - /// /// Check if the job can be resumed. /// @@ -509,43 +356,6 @@ protected JobStepDetails CreateStep(object request) /// The . protected virtual JobDetails GetJobDetails() => JobDetails.NotSet; - async Task> HandleJobStepCompleted(JobStepId stepId, JobStepResult result) - { - try - { - await OnStepCompleted(stepId, result); - var handleCompletionResult = await HandleCompletion(); - if (handleCompletionResult.TryGetError(out var handleCompletionError)) - { - return handleCompletionError; - } - var needsToWriteState = handleCompletionResult.AsT0 switch - { - HandleCompletionSuccess.NotClearedState => true, - _ => false - }; - await Unsubscribe(_jobStepGrains[stepId].Grain.AsReference()); - _jobStepGrains.Remove(stepId); - if (!needsToWriteState) - { - return Result.Success(); - } - var writeStateResult = await WriteState(); - return writeStateResult.Match( - _ => Result.Success(), - ex => - { - _logger.FailedUpdatingStateAfterHandlingJobStepCompletion(ex, stepId); - return JobError.PersistStateError; - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - bool JobIsRunning() => State.Status is JobStatus.Running or JobStatus.PreparingSteps or JobStatus.PreparingStepsForRunning; void StatusChanged(JobStatus status, Exception? exception = null) @@ -560,72 +370,6 @@ void StatusChanged(JobStatus status, Exception? exception = null) State.Status = status; } - Dictionary CreateGrainsFromJobSteps(IImmutableList jobSteps) => - jobSteps.ToDictionary( - details => details.Id, - details => new JobStepGrainAndRequest( - (GrainFactory.GetGrain(details.Type, details.Id, keyExtension: details.Key) as IJobStep)!, - details.Request, - details.ResultType)); - - async Task PrepareAllSteps(TRequest request, TaskCompletionSource> tcs) - { - await OnBeforePrepareSteps(request); - _ = Task.Run(async () => - { - var steps = await PrepareSteps(request); - await ThisJob.SetTotalSteps(steps.Count); - tcs.SetResult(steps); - }); - } - - void PrepareAndStartAllJobSteps(GrainId grainId) => _ = Task.Run(async () => - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - _logger.PrepareJobStepsForRunning(); - - _ = await ThisJob.WriteStatusChanged(JobStatus.PreparingStepsForRunning); - - try - { - if ((await PrepareAndStartJobSteps(grainId)).TryGetError(out var prepareStartError)) - { - _ = await ThisJob.WriteStatusChanged(JobStatus.Failed); - _ = await ThisJob.Stop(); - } - else - { - _ = await ThisJob.WriteStatusChanged(JobStatus.Running); // What to do if this fails? - } - } - catch (Exception ex) - { - _logger.Failed(ex); - - _ = await ThisJob.WriteStatusChanged(JobStatus.Failed, ex); - _ = await ThisJob.Stop(); - } - }); - - async Task> PrepareAndStartJobSteps(GrainId grainId) - { - foreach (var (id, jobStep) in _jobStepGrains) - { - await ThisJob.Subscribe(jobStep.Grain.AsReference()); - if ((await jobStep.Grain.Prepare(jobStep.Request)).TryGetError(out var prepareError)) - { - _logger.FailedPreparingJobStep(id, prepareError); - return prepareError; - } - if ((await jobStep.Grain.Start(grainId, jobStep.Request)).TryGetError(out var startError)) - { - _logger.FailedStartingJobStep(id, startError); - return startError; - } - } - return Result.Success(); - } - async Task> HandleCompletion() { try @@ -642,7 +386,7 @@ async Task> HandleCompletion() { return onCompletedError; } - if (!KeepAfterCompleted) + if (!KeepAfterCompleted) // TODO: Should we always keep state if some or all steps, or job, failed? { await ClearStateAsync(); cleared = true; From 457bfd6e4797efce194bfb57c0b04f7c8d22d129 Mon Sep 17 00:00:00 2001 From: woksin Date: Wed, 12 Feb 2025 14:32:14 +0100 Subject: [PATCH 9/9] Make IObserver GetState not Interleaved because it was only used in tests and then it is nice to wait for the Observer to be finished processing messages --- Source/Kernel/Grains.Interfaces/Observation/IObserver.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs b/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs index d292daf40..05ff2a692 100644 --- a/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs +++ b/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs @@ -25,7 +25,6 @@ public interface IObserver : IStateMachine, IGrainWithStringKey /// Get the state from the observer. /// /// The . - [AlwaysInterleave] Task GetState(); ///