diff --git a/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs b/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs index 81e7d4fe9..f3672b90c 100644 --- a/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs +++ b/Integration/Orleans.InProcess/for_Reactors/when_appending_event/and_waiting_for_observer_to_be_active.cs @@ -8,6 +8,7 @@ using Cratis.Chronicle.Storage.MongoDB; using Cratis.Chronicle.Storage.MongoDB.Observation; using Cratis.Chronicle.Storage.Observation; +using Humanizer; using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reactors.when_appending_event.and_waiting_for_observer_to_be_active.context; using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState; @@ -49,6 +50,7 @@ async Task Because() await EventStore.EventLog.Append(EventSourceId, Event); await Tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); WaitingForObserverStateError = await Catch.Exception(async () => await ReactorObserver.WaitForState(ObserverRunningState.Active, TimeSpan.FromSeconds(5))); + await ReactorObserver.WaitTillReachesEventSequenceNumber(EventSequenceNumber.First, 5.Seconds()); ReactorObserverState = await ReactorObserver.GetState(); FailedPartitions = await EventStore.Connection.Services.FailedPartitions.GetFailedPartitions(new() diff --git a/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs b/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs index b8b362af8..fc4f1073a 100644 --- a/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs +++ b/Integration/Orleans.InProcess/for_Reducers/when_appending_event/and_waiting_for_observer_to_be_active.cs @@ -8,6 +8,7 @@ using Cratis.Chronicle.Storage.MongoDB; using Cratis.Chronicle.Storage.MongoDB.Observation; using Cratis.Chronicle.Storage.Observation; +using Humanizer; using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_Reducers.when_appending_event.and_waiting_for_observer_to_be_active.context; using ObserverRunningState = Cratis.Chronicle.Concepts.Observation.ObserverRunningState; @@ -49,6 +50,7 @@ async Task Because() await EventStore.EventLog.Append(EventSourceId, Event); await Tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); WaitingForObserverStateError = await Catch.Exception(async () => await ReducerObserver.WaitForState(ObserverRunningState.Active, TimeSpan.FromSeconds(5))); + await ReducerObserver.WaitTillReachesEventSequenceNumber(EventSequenceNumber.First, 5.Seconds()); ReducerObserverState = await ReducerObserver.GetState(); FailedPartitions = await EventStore.Connection.Services.FailedPartitions.GetFailedPartitions(new() diff --git a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs index 381ab6ba2..34d38d5f0 100644 --- a/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs +++ b/Source/Kernel/Grains.Interfaces/Jobs/IJob.cs @@ -114,5 +114,5 @@ public interface IJob : IJob /// /// The request object for the job. /// Awaitable task. - Task> Start(TRequest request); + Task> Start(TRequest request); } \ No newline at end of file diff --git a/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs b/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs new file mode 100644 index 000000000..1dd9a63e1 --- /dev/null +++ b/Source/Kernel/Grains.Interfaces/Jobs/StartJobError.cs @@ -0,0 +1,35 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Grains.Jobs; + +/// +/// The type of error that occurred while starting . +/// +public enum StartJobError +{ + /// + /// Unknown error occurred. + /// + Unknown = 0, + + /// + /// Some job steps failed to prepare. + /// + CouldNotPrepareJobSteps = 1, + + /// + /// Some jobs failed to start. + /// + FailedStartingSomeJobSteps = 2, + + /// + /// None of the jobs was started. + /// + AllJobStepsFailedStarting = 3, + + /// + /// There were no prepared job steps. + /// + NoJobStepsToStart = 4, +} diff --git a/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs b/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs index d292daf40..05ff2a692 100644 --- a/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs +++ b/Source/Kernel/Grains.Interfaces/Observation/IObserver.cs @@ -25,7 +25,6 @@ public interface IObserver : IStateMachine, IGrainWithStringKey /// Get the state from the observer. /// /// The . - [AlwaysInterleave] Task GetState(); /// diff --git a/Source/Kernel/Grains/Jobs/Job.Steps.cs b/Source/Kernel/Grains/Jobs/Job.Steps.cs new file mode 100644 index 000000000..d5fca822e --- /dev/null +++ b/Source/Kernel/Grains/Jobs/Job.Steps.cs @@ -0,0 +1,300 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Collections.Immutable; +using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Reflection; +namespace Cratis.Chronicle.Grains.Jobs; + +public abstract partial class Job +{ + /// + public async Task> OnStepSucceeded(JobStepId stepId, JobStepResult jobStepResult) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + try + { + _logger.StepSuccessfullyCompleted(stepId); + State.Progress.SuccessfulSteps++; + + if ((await WriteState()).TryGetException(out var writeStateError)) + { + _logger.FailedUpdatingSuccessfulSteps(writeStateError, State.Progress.SuccessfulSteps); + return JobError.PersistStateError; + } + + var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); + return handleCompletedStepResult.Match( + _ => Result.Success(), + error => + { + _logger.FailedHandlingCompletedJobStep(stepId, error); + return Result.Failed(error); + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + } + + /// + public Task> OnStepStopped(JobStepId stepId, JobStepResult jobStepResult) => Task.FromResult(Result.Success()); + + /// + public async Task> OnStepFailed(JobStepId stepId, JobStepResult jobStepResult) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + try + { + _logger.StepFailed(stepId); + State.Progress.FailedSteps++; + if ((await WriteState()).TryGetException(out var writeStateError)) + { + _logger.FailedUpdatingFailedSteps(writeStateError, State.Progress.SuccessfulSteps); + return JobError.PersistStateError; + } + var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); + return handleCompletedStepResult.Match( + _ => Result.Success(), + error => + { + _logger.FailedHandlingCompletedJobStep(stepId, error); + return Result.Failed(error); + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + } + + /// + /// Called when a step has completed. + /// + /// for the completed step. + /// for the completed step. + /// Awaitable task. + protected virtual Task OnStepCompleted(JobStepId jobStepId, JobStepResult result) => Task.CompletedTask; + + /// + /// Create a new . + /// + /// The request associated with the step. + /// The type of job step to create. + /// A new instance of the job step. + protected JobStepDetails CreateStep(object request) + where TJobStep : IJobStep + { + var jobStepId = JobStepId.New(); + var jobId = this.GetPrimaryKey(out var keyExtension); + var jobKey = (JobKey)keyExtension!; + var jobStepType = typeof(TJobStep) + .AllBaseAndImplementingTypes() + .First( + _ => _.IsGenericType && _.GetGenericTypeDefinition() == typeof(IJobStep<,>)); + var resultType = jobStepType.GetGenericArguments()[1]; + return new( + typeof(TJobStep), + jobStepId, + new(jobId, jobKey.EventStore, jobKey.Namespace), + request, + resultType); + } + + /// + /// Called before preparing steps. + /// + /// The request associated with the job. + /// Awaitable task. + protected virtual Task OnBeforePrepareSteps(TRequest request) => Task.CompletedTask; + + /// + /// Start the job. + /// + /// The request associated with the job. + /// Collection of . + protected abstract Task> PrepareSteps(TRequest request); + + async Task> HandleJobStepCompleted(JobStepId stepId, JobStepResult result) + { + try + { + await OnStepCompleted(stepId, result); + var handleCompletionResult = await HandleCompletion(); + if (handleCompletionResult.TryGetError(out var handleCompletionError)) + { + return handleCompletionError; + } + var needsToWriteState = handleCompletionResult.AsT0 switch + { + HandleCompletionSuccess.NotClearedState => true, + _ => false + }; + if (!needsToWriteState) + { + return Result.Success(); + } + var writeStateResult = await WriteState(); + return writeStateResult.Match( + _ => Result.Success(), + ex => + { + _logger.FailedUpdatingStateAfterHandlingJobStepCompletion(ex, stepId); + return JobError.PersistStateError; + }); + } + catch (Exception ex) + { + _logger.Failed(ex); + return JobError.UnknownError; + } + finally + { + await Unsubscribe(_jobStepGrains[stepId].Grain.AsReference()); + _jobStepGrains.Remove(stepId); + } + } + + Dictionary CreateGrainsFromJobSteps(IImmutableList jobSteps) => + jobSteps.ToDictionary( + details => details.Id, + details => new JobStepGrainAndRequest( + (GrainFactory.GetGrain(details.Type, details.Id, keyExtension: details.Key) as IJobStep)!, + details.Request, + details.ResultType)); + + async Task> PrepareAndStartRunningAllSteps(TRequest request) + { + try + { + var grainId = this.GetGrainId(); + await OnBeforePrepareSteps(request); + var steps = await PrepareSteps(request); + await SetTotalSteps(steps.Count); + if (steps.Count == 0) + { + _logger.NoJobStepsToStart(); + + // TODO: Not sure if Complete should be called or HandleCompletion + var onCompletedResult = await Complete(); + if (onCompletedResult.TryGetError(out var onCompletedError)) + { + _logger.FailedOnCompletedWhileNoJobSteps(onCompletedError); + _ = await WriteStatusChanged(JobStatus.CompletedWithFailures); + } + else + { + await WriteStatusChanged(JobStatus.CompletedSuccessfully); + if (!KeepAfterCompleted) + { + await ClearStateAsync(); + } + } + return StartJobError.NoJobStepsToStart; + } + + _logger.PreparingJobSteps(steps.Count); + _jobStepGrains = CreateGrainsFromJobSteps(steps); + return await PrepareAndStartAllJobSteps(grainId); + } + catch (Exception ex) + { + _logger.ErrorPreparingJobSteps(ex); + _ = await WriteStatusChanged(JobStatus.Failed, ex); + return StartJobError.Unknown; + } + } + + async Task> PrepareAndStartAllJobSteps(GrainId grainId) + { + using var scope = _logger.BeginJobScope(JobId, JobKey); + _logger.PrepareJobStepsForRunning(); + _ = await WriteStatusChanged(JobStatus.PreparingStepsForRunning); + + var preparedAllJobSteps = await TryPrepareAllJobSteps(); + if (!preparedAllJobSteps) + { + _ = await WriteStatusChanged(JobStatus.Failed); + return StartJobError.CouldNotPrepareJobSteps; + } + + var startJobStepsResult = await StartAndSubscribeToAllJobSteps(grainId); + if (startJobStepsResult.TryGetError(out var startJobStepsError)) + { + if (startJobStepsError == StartJobError.AllJobStepsFailedStarting) + { + _ = await HandleCompletion(); + _ = await WriteState(); + } + return startJobStepsError; + } + + _ = await WriteStatusChanged(JobStatus.Running); + return Result.Success(); + } + + async Task TryPrepareAllJobSteps() + { + var prepareAllSteps = _jobStepGrains.Select(async idAndGrain => + { + var (id, jobStep) = idAndGrain; + try + { + if (!(await jobStep.Grain.Prepare(jobStep.Request)).TryGetError(out var prepareError)) + { + return (JobStepId: id, Result: Result.Success()); + } + _logger.FailedPreparingJobStep(id, prepareError); + return (JobStepId: id, Result: prepareError); + } + catch (Exception ex) + { + _logger.ErrorPreparingJobStep(ex, id); + return (JobStepId: id, Result: Result.Failed(JobStepPrepareStartError.Unknown)); + } + }); + var results = await Task.WhenAll(prepareAllSteps); + return results.All(_ => _.Result.IsSuccess); + } + + async Task> StartAndSubscribeToAllJobSteps(GrainId grainId) + { + var prepareAllSteps = _jobStepGrains.Select(async idAndGrain => + { + var (id, jobStep) = idAndGrain; + try + { + if (!(await jobStep.Grain.Start(grainId, jobStep.Request)).TryGetError(out var startError)) + { + return (JobStepId: id, Result: Result.Success()); + } + _logger.FailedStartingJobStep(id, startError); + return (JobStepId: id, Result: startError); + } + catch (Exception ex) + { + _logger.FailedStartingJobStep(ex, id); + return (JobStepId: id, Result: Result.Failed(JobStepPrepareStartError.Unknown)); + } + }); + var results = await Task.WhenAll(prepareAllSteps); + var numFailedJobSteps = results.Count(finishedTask => !finishedTask.Result.IsSuccess); + foreach (var idAndJobStep in results.Where(_ => _.Result.IsSuccess)) + { + var jobStepGrain = _jobStepGrains[idAndJobStep.JobStepId].Grain; + await Subscribe(jobStepGrain.AsReference()); + } + if (numFailedJobSteps == 0) + { + return Result.Success(); + } + State.Progress.FailedSteps += numFailedJobSteps; + return numFailedJobSteps == results.Length + ? StartJobError.AllJobStepsFailedStarting + : StartJobError.FailedStartingSomeJobSteps; + } +} diff --git a/Source/Kernel/Grains/Jobs/Job.cs b/Source/Kernel/Grains/Jobs/Job.cs index c7b520e73..d66d9f3e3 100644 --- a/Source/Kernel/Grains/Jobs/Job.cs +++ b/Source/Kernel/Grains/Jobs/Job.cs @@ -1,12 +1,10 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Immutable; using Cratis.Chronicle.Concepts; using Cratis.Chronicle.Concepts.Jobs; using Cratis.Chronicle.Storage; using Cratis.Chronicle.Storage.Jobs; -using Cratis.Reflection; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -21,7 +19,7 @@ namespace Cratis.Chronicle.Grains.Jobs; /// Type of request object that gets passed to job. /// Type of state for the job. [StorageProvider(ProviderName = WellKnownGrainStorageProviders.Jobs)] -public abstract class Job : Grain, IJob +public abstract partial class Job : Grain, IJob where TRequest : class, IJobRequest where TJobState : JobState { @@ -106,60 +104,18 @@ public override Task OnActivateAsync(CancellationToken cancellationToken) } /// - public async Task> Start(TRequest request) + public async Task> Start(TRequest request) { using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.Starting(); - _isRunning = true; - State.Created = DateTimeOffset.UtcNow; - State.Request = request!; - State.Details = GetJobDetails(); - await WriteStatusChanged(JobStatus.PreparingSteps); - var grainId = this.GetGrainId(); - var tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); - - await PrepareAllSteps(request, tcs); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - tcs.Task.ContinueWith( - async getPreparedJobSteps => - { - try - { - var jobSteps = await getPreparedJobSteps; - if (jobSteps.Count == 0) - { - _logger.NoJobStepsToStart(); - var onCompletedResult = await Complete(); - if (onCompletedResult.TryGetError(out var onCompletedError)) - { - _logger.FailedOnCompletedWhileNoJobSteps(onCompletedError); - } - StatusChanged(JobStatus.CompletedSuccessfully); // This is effectively just a noop since state is cleared after this line - await ClearStateAsync(); - return; - } - - _logger.PreparingJobSteps(jobSteps.Count); - _jobStepGrains = CreateGrainsFromJobSteps(jobSteps); - _ = await WriteStatusChanged(JobStatus.PreparingStepsForRunning); - PrepareAndStartAllJobSteps(grainId); - } - catch (Exception ex) - { - _logger.ErrorPreparingJobSteps(ex); - } - }, - TaskScheduler.Current); - return Result.Success(); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } + _logger.Starting(); + _isRunning = true; + + State.Created = DateTimeOffset.UtcNow; + State.Request = request!; + State.Details = GetJobDetails(); + + _ = await WriteStatusChanged(JobStatus.PreparingSteps); + return await PrepareAndStartRunningAllSteps(request); } /// @@ -179,6 +135,7 @@ public async Task> Resume() } _logger.Resuming(); + _isRunning = true; var getStepsResult = await Storage.JobSteps.GetForJob(JobId, JobStepStatus.Scheduled, JobStepStatus.Running, JobStepStatus.Paused); return await getStepsResult.Match( @@ -278,7 +235,7 @@ public async Task> Stop() await _observers!.Notify(o => o.OnJobStopped()); var stepStorage = ServiceProvider.GetRequiredService(); - + // TODO: We probably don't want to do this. And we need to nail down the semantics of Stop vs Pause and how it works with Resume var removeAllStepsResult = await stepStorage.RemoveAllForJob(JobId); return await removeAllStepsResult.Match( async _ => @@ -304,69 +261,6 @@ public async Task> Stop() /// public virtual Task OnCompleted() => Task.FromResult(Result.Success()); - /// - public async Task> OnStepSucceeded(JobStepId stepId, JobStepResult jobStepResult) - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.StepSuccessfullyCompleted(stepId); - State.Progress.SuccessfulSteps++; - - if ((await WriteState()).TryGetException(out var writeStateError)) - { - _logger.FailedUpdatingSuccessfulSteps(writeStateError, State.Progress.SuccessfulSteps); - return JobError.PersistStateError; - } - - var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); - return handleCompletedStepResult.Match( - _ => Result.Success(), - error => - { - _logger.FailedHandlingCompletedJobStep(stepId, error); - return Result.Failed(error); - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - - /// - public Task> OnStepStopped(JobStepId stepId, JobStepResult jobStepResult) => Task.FromResult(Result.Success()); - - /// - public async Task> OnStepFailed(JobStepId stepId, JobStepResult jobStepResult) - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - try - { - _logger.StepFailed(stepId); - State.Progress.FailedSteps++; - if ((await WriteState()).TryGetException(out var writeStateError)) - { - _logger.FailedUpdatingFailedSteps(writeStateError, State.Progress.SuccessfulSteps); - return JobError.PersistStateError; - } - var handleCompletedStepResult = await HandleJobStepCompleted(stepId, jobStepResult); - return handleCompletedStepResult.Match( - _ => Result.Success(), - error => - { - _logger.FailedHandlingCompletedJobStep(stepId, error); - return Result.Failed(error); - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - /// public async Task> SetTotalSteps(int totalSteps) { @@ -450,53 +344,6 @@ protected async Task WriteState() protected bool IsInStoppedOrCompletedState() => State.Status is JobStatus.Stopped or JobStatus.CompletedSuccessfully or JobStatus.CompletedWithFailures; - /// - /// Called when a step has completed. - /// - /// for the completed step. - /// for the completed step. - /// Awaitable task. - protected virtual Task OnStepCompleted(JobStepId jobStepId, JobStepResult result) => Task.CompletedTask; - - /// - /// Create a new . - /// - /// The request associated with the step. - /// The type of job step to create. - /// A new instance of the job step. - protected JobStepDetails CreateStep(object request) - where TJobStep : IJobStep - { - var jobStepId = JobStepId.New(); - var jobId = this.GetPrimaryKey(out var keyExtension); - var jobKey = (JobKey)keyExtension!; - var jobStepType = typeof(TJobStep) - .AllBaseAndImplementingTypes() - .First( - _ => _.IsGenericType && _.GetGenericTypeDefinition() == typeof(IJobStep<,>)); - var resultType = jobStepType.GetGenericArguments()[1]; - return new( - typeof(TJobStep), - jobStepId, - new(jobId, jobKey.EventStore, jobKey.Namespace), - request, - resultType); - } - - /// - /// Called before preparing steps. - /// - /// The request associated with the job. - /// Awaitable task. - protected virtual Task OnBeforePrepareSteps(TRequest request) => Task.CompletedTask; - - /// - /// Start the job. - /// - /// The request associated with the job. - /// Collection of . - protected abstract Task> PrepareSteps(TRequest request); - /// /// Check if the job can be resumed. /// @@ -509,43 +356,6 @@ protected JobStepDetails CreateStep(object request) /// The . protected virtual JobDetails GetJobDetails() => JobDetails.NotSet; - async Task> HandleJobStepCompleted(JobStepId stepId, JobStepResult result) - { - try - { - await OnStepCompleted(stepId, result); - var handleCompletionResult = await HandleCompletion(); - if (handleCompletionResult.TryGetError(out var handleCompletionError)) - { - return handleCompletionError; - } - var needsToWriteState = handleCompletionResult.AsT0 switch - { - HandleCompletionSuccess.NotClearedState => true, - _ => false - }; - await Unsubscribe(_jobStepGrains[stepId].Grain.AsReference()); - _jobStepGrains.Remove(stepId); - if (!needsToWriteState) - { - return Result.Success(); - } - var writeStateResult = await WriteState(); - return writeStateResult.Match( - _ => Result.Success(), - ex => - { - _logger.FailedUpdatingStateAfterHandlingJobStepCompletion(ex, stepId); - return JobError.PersistStateError; - }); - } - catch (Exception ex) - { - _logger.Failed(ex); - return JobError.UnknownError; - } - } - bool JobIsRunning() => State.Status is JobStatus.Running or JobStatus.PreparingSteps or JobStatus.PreparingStepsForRunning; void StatusChanged(JobStatus status, Exception? exception = null) @@ -560,72 +370,6 @@ void StatusChanged(JobStatus status, Exception? exception = null) State.Status = status; } - Dictionary CreateGrainsFromJobSteps(IImmutableList jobSteps) => - jobSteps.ToDictionary( - details => details.Id, - details => new JobStepGrainAndRequest( - (GrainFactory.GetGrain(details.Type, details.Id, keyExtension: details.Key) as IJobStep)!, - details.Request, - details.ResultType)); - - async Task PrepareAllSteps(TRequest request, TaskCompletionSource> tcs) - { - await OnBeforePrepareSteps(request); - _ = Task.Run(async () => - { - var steps = await PrepareSteps(request); - await ThisJob.SetTotalSteps(steps.Count); - tcs.SetResult(steps); - }); - } - - void PrepareAndStartAllJobSteps(GrainId grainId) => _ = Task.Run(async () => - { - using var scope = _logger.BeginJobScope(JobId, JobKey); - _logger.PrepareJobStepsForRunning(); - - _ = await ThisJob.WriteStatusChanged(JobStatus.PreparingStepsForRunning); - - try - { - if ((await PrepareAndStartJobSteps(grainId)).TryGetError(out var prepareStartError)) - { - _ = await ThisJob.WriteStatusChanged(JobStatus.Failed); - _ = await ThisJob.Stop(); - } - else - { - _ = await ThisJob.WriteStatusChanged(JobStatus.Running); // What to do if this fails? - } - } - catch (Exception ex) - { - _logger.Failed(ex); - - _ = await ThisJob.WriteStatusChanged(JobStatus.Failed, ex); - _ = await ThisJob.Stop(); - } - }); - - async Task> PrepareAndStartJobSteps(GrainId grainId) - { - foreach (var (id, jobStep) in _jobStepGrains) - { - await ThisJob.Subscribe(jobStep.Grain.AsReference()); - if ((await jobStep.Grain.Prepare(jobStep.Request)).TryGetError(out var prepareError)) - { - _logger.FailedPreparingJobStep(id, prepareError); - return prepareError; - } - if ((await jobStep.Grain.Start(grainId, jobStep.Request)).TryGetError(out var startError)) - { - _logger.FailedStartingJobStep(id, startError); - return startError; - } - } - return Result.Success(); - } - async Task> HandleCompletion() { try @@ -642,7 +386,7 @@ async Task> HandleCompletion() { return onCompletedError; } - if (!KeepAfterCompleted) + if (!KeepAfterCompleted) // TODO: Should we always keep state if some or all steps, or job, failed? { await ClearStateAsync(); cleared = true; diff --git a/Source/Kernel/Grains/Jobs/JobLogging.cs b/Source/Kernel/Grains/Jobs/JobLogging.cs index 2e68a0de9..4bdf6fc39 100644 --- a/Source/Kernel/Grains/Jobs/JobLogging.cs +++ b/Source/Kernel/Grains/Jobs/JobLogging.cs @@ -99,6 +99,9 @@ internal static partial class JobLogMessages [LoggerMessage(LogLevel.Warning, "Job failed starting job step {JobStepId}. Error: {Error}")] internal static partial void FailedStartingJobStep(this ILogger logger, JobStepId jobStepId, JobStepPrepareStartError error); + [LoggerMessage(LogLevel.Warning, "Job failed starting job step {JobStepId}")] + internal static partial void FailedStartingJobStep(this ILogger logger, Exception ex, JobStepId jobStepId); + [LoggerMessage(LogLevel.Debug, "Not all steps was completed successfully")] internal static partial void AllStepsNotCompletedSuccessfully(this ILogger logger); } diff --git a/Source/Kernel/Grains/Jobs/JobsManager.cs b/Source/Kernel/Grains/Jobs/JobsManager.cs index 401a7a33a..9f3b06bc0 100644 --- a/Source/Kernel/Grains/Jobs/JobsManager.cs +++ b/Source/Kernel/Grains/Jobs/JobsManager.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using OneOf; using OneOf.Types; +using Orleans.Concurrency; namespace Cratis.Chronicle.Grains.Jobs; @@ -21,6 +22,7 @@ namespace Cratis.Chronicle.Grains.Jobs; /// for working with underlying storage. /// that knows about job type associations. /// Logger for logging. +[Reentrant] public class JobsManager( IStorage storage, IJobTypes jobTypes, diff --git a/Source/Kernel/Grains/Observation/Observer.Catchup.cs b/Source/Kernel/Grains/Observation/Observer.Catchup.cs new file mode 100644 index 000000000..e3537c89f --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Catchup.cs @@ -0,0 +1,138 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts; +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Grains.EventSequences; +using Cratis.Chronicle.Grains.Observation.Jobs; +using Cratis.Chronicle.Grains.Observation.States; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task CatchUp() + { + _isPreparingCatchup = true; + using var scope = logger.BeginObserverScope(State.Id, _observerKey); + + var subscription = await GetSubscription(); + + var jobs = await _jobsManager.GetJobsOfType(); + if (jobs.Any(_ => _.Status == JobStatus.Running)) + { + logger.FinishingExistingCatchUpJob(); + return; + } + + var pausedJob = jobs.FirstOrDefault(_ => _.Status == JobStatus.Paused); + + if (pausedJob is not null) + { + logger.ResumingCatchUpJob(); + await _jobsManager.Resume(pausedJob.Id); + } + else + { + logger.StartCatchUpJob(State.NextEventSequenceNumber); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + subscription, + State.NextEventSequenceNumber, + State.EventTypes)); + } + } + + /// + public async Task RegisterCatchingUpPartitions(IEnumerable partitions) + { + using var scope = logger.BeginObserverScope(State.Id, _observerKey); + logger.RegisteringCatchingUpPartitions(); + foreach (var partition in partitions) + { + State.CatchingUpPartitions.Add(partition); + } + + await WriteStateAsync(); + + _isPreparingCatchup = false; + } + + /// + public async Task CaughtUp(EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await TransitionTo(); + } + + /// + public async Task PartitionCaughtUp(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.PartitionCaughtUp(partition, lastHandledEventSequenceNumber); + State.CatchingUpPartitions.Remove(partition); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } + + async Task StartCatchupJobIfNeeded(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + if (failures.State.IsFailed(partition)) + { + logger.PartitionToCatchUpIsFailing(partition); + return; + } + if (!lastHandledEventSequenceNumber.IsActualValue) + { + logger.LastHandledEventIsNotActualValue(); + return; + } + var needCatchupResult = await NeedsCatchup(partition, lastHandledEventSequenceNumber); + await needCatchupResult.Match( + needCatchup => needCatchup + ? StartCatchupJob(partition, lastHandledEventSequenceNumber) + : Task.CompletedTask, + error => + { + switch (error) + { + case GetSequenceNumberError.NotFound: + logger.LastHandledEventForPartitionUnavailable(partition); + return Task.CompletedTask; + default: + return PartitionFailed(partition, lastHandledEventSequenceNumber.Next(), ["Event Sequence storage error caused partition to try recover"], string.Empty); + } + }); + } + + async Task StartCatchupJob(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + var nextEventSequenceNumber = lastHandledEventSequenceNumber.Next(); + logger.StartingCatchUpForPartition(partition, nextEventSequenceNumber); + State.CatchingUpPartitions.Add(partition); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + partition, + nextEventSequenceNumber, + State.EventTypes)); + await WriteStateAsync(); + } + + async Task> NeedsCatchup(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + var nextSequenceNumber = await _eventSequence.GetNextSequenceNumberGreaterOrEqualTo(lastHandledEventSequenceNumber, State.EventTypes, partition); + return nextSequenceNumber.Match>( + number => number != lastHandledEventSequenceNumber, + error => error); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Failing.cs b/Source/Kernel/Grains/Observation/Observer.Failing.cs new file mode 100644 index 000000000..d8e69fbac --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Failing.cs @@ -0,0 +1,111 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Configuration; +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +using Cratis.Chronicle.Grains.Observation.Jobs; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task PartitionFailed( + Key partition, + EventSequenceNumber sequenceNumber, + IEnumerable exceptionMessages, + string exceptionStackTrace) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + _metrics?.PartitionFailed(partition); + logger.PartitionFailed(partition, sequenceNumber); + var failure = failures.State.RegisterAttempt(partition, sequenceNumber, exceptionMessages, exceptionStackTrace); + var config = await configurationProvider.GetFor(_observerKey); + if (config.MaxRetryAttempts == 0 || failure.Attempts.Count() <= config.MaxRetryAttempts) + { + await this.RegisterOrUpdateReminder(partition.ToString(), GetNextRetryDelay(failure, config), TimeSpan.FromHours(48)); + } + else + { + logger.GivingUpOnRecoveringFailedPartition(partition); + } + + await failures.WriteStateAsync(); + } + + /// + public async Task FailedPartitionRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FailingPartitionRecovered(partition); + failures.State.Remove(partition); + await failures.WriteStateAsync(); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } + + /// + public async Task FailedPartitionPartiallyRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FailingPartitionPartiallyRecovered(partition, lastHandledEventSequenceNumber); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + } + + /// + public async Task TryStartRecoverJobForFailedPartition(Key partition) + { + if (!Failures.TryGet(partition, out var failure)) + { + return; + } + + await StartRecoverJobForFailedPartition(failure); + } + + /// + public async Task TryRecoverAllFailedPartitions() + { + foreach (var partition in Failures.Partitions) + { + await StartRecoverJobForFailedPartition(partition); + } + } + + static TimeSpan GetNextRetryDelay(FailedPartition failure, Observers config) + { + var time = TimeSpan.FromSeconds(config.BackoffDelay * Math.Pow(config.ExponentialBackoffDelayFactor, failure.Attempts.Count())); + var maxTime = TimeSpan.FromSeconds(config.MaximumBackoffDelay); + + if (time > maxTime) + { + return maxTime; + } + + if (time == TimeSpan.Zero) + { + return TimeSpan.FromSeconds(config.BackoffDelay); + } + + return time; + } + + async Task StartRecoverJobForFailedPartition(FailedPartition failedPartition) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.TryingToRecoverFailedPartition(failedPartition.Partition); + await RemoveReminder(failedPartition.Partition.ToString()); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + failedPartition.Partition, + failedPartition.LastAttempt.SequenceNumber, + State.EventTypes)); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Handling.cs b/Source/Kernel/Grains/Observation/Observer.Handling.cs new file mode 100644 index 000000000..fbb449e8c --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Handling.cs @@ -0,0 +1,201 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public Task SetHandledStats(EventSequenceNumber lastHandledEventSequenceNumber) + { + State = State with + { + LastHandledEventSequenceNumber = lastHandledEventSequenceNumber + }; + + return WriteStateAsync(); + } + + /// + public async Task Handle(Key partition, IEnumerable events) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + + if (!events.Any()) + { + return; + } + + if (!ShouldHandleEvent(partition)) + { + return; + } + + if (!events.Any(_ => _subscription.EventTypes.Contains(_.Metadata.Type))) + { + State = State with + { + NextEventSequenceNumber = events.Last().Metadata.SequenceNumber.Next() + }; + await WriteStateAsync(); + return; + } + + var failed = false; + var exceptionMessages = Enumerable.Empty(); + var exceptionStackTrace = string.Empty; + var tailEventSequenceNumber = State.NextEventSequenceNumber; + + var eventsToHandle = events.Where(_ => _.Metadata.SequenceNumber >= tailEventSequenceNumber).ToArray(); + var numEventsSuccessfullyHandled = EventCount.Zero; + var stateChanged = false; + if (eventsToHandle.Length != 0) + { + using (new WriteSuspension(this)) + { + try + { + var key = new ObserverSubscriberKey( + _observerKey.ObserverId, + _observerKey.EventStore, + _observerKey.Namespace, + _observerKey.EventSequenceId, + partition, + _subscription.SiloAddress.ToParsableString()); + + var firstEvent = eventsToHandle[0]; + + var subscriber = (GrainFactory.GetGrain(_subscription.SubscriberType, key) as IObserverSubscriber)!; + tailEventSequenceNumber = firstEvent.Metadata.SequenceNumber; + var result = await subscriber.OnNext(partition, eventsToHandle, new(_subscription.Arguments)); + numEventsSuccessfullyHandled = result.HandledAnyEvents + ? eventsToHandle.Count(_ => _.Metadata.SequenceNumber <= result.LastSuccessfulObservation) + : EventCount.Zero; + + if (result.State == ObserverSubscriberState.Failed) + { + failed = true; + exceptionMessages = result.ExceptionMessages; + exceptionStackTrace = result.ExceptionStackTrace; + tailEventSequenceNumber = result.HandledAnyEvents + ? result.LastSuccessfulObservation + : firstEvent.Metadata.SequenceNumber; + } + else if (result.State == ObserverSubscriberState.Disconnected) + { + await Unsubscribe(); + stateChanged = true; + } + + if (numEventsSuccessfullyHandled > 0) + { + stateChanged = true; + State = State with + { + NextEventSequenceNumber = result.LastSuccessfulObservation.Next() + }; + var previousLastHandled = State.LastHandledEventSequenceNumber; + var shouldSetLastHandled = + previousLastHandled == EventSequenceNumber.Unavailable || + previousLastHandled < result.LastSuccessfulObservation; + State = State with + { + LastHandledEventSequenceNumber = shouldSetLastHandled + ? result.LastSuccessfulObservation + : previousLastHandled, + }; + } + } + catch (Exception ex) + { + failed = true; + exceptionMessages = ex.GetAllMessages().ToArray(); + exceptionStackTrace = ex.StackTrace ?? string.Empty; + } + } + + try + { + if (failed) + { + await PartitionFailed(partition, tailEventSequenceNumber, exceptionMessages, exceptionStackTrace); + } + else + { + _metrics?.SuccessfulObservation(); + } + + if (stateChanged) + { + await WriteStateAsync(); + } + } + catch (Exception ex) + { + logger.ObserverFailedForUnknownReasonsAfterHandlingEvents(ex); + } + } + } + + bool ShouldHandleEvent(Key partition) + { + if (!_subscription.IsSubscribed) + { + logger.ObserverIsNotSubscribed(); + return false; + } + + if (Failures.IsFailed(partition)) + { + logger.PartitionIsFailed(partition); + return false; + } + + if (State.RunningState != ObserverRunningState.Active) + { + logger.ObserverIsNotActive(); + return false; + } + + if (_isPreparingCatchup) + { + logger.ObserverIsPreparingCatchup(); + return false; + } + + if (State.ReplayingPartitions.Contains(partition)) + { + logger.PartitionReplayingCannotHandleNewEvents(partition); + return false; + } + + if (State.CatchingUpPartitions.Contains(partition)) + { + logger.PartitionCatchingUpCannotHandleNewEvents(partition); + return false; + } + + return true; + } + + void HandleNewLastHandledEvent(EventSequenceNumber lastHandledEvent) + { + if (!lastHandledEvent.IsActualValue) + { + logger.LastHandledEventIsNotActualValue(); + return; + } + + var newLastHandledEvent = State.LastHandledEventSequenceNumber == EventSequenceNumber.Unavailable || + State.LastHandledEventSequenceNumber < lastHandledEvent ? lastHandledEvent : State.LastHandledEventSequenceNumber; + var nextEventSequenceNumber = State.NextEventSequenceNumber <= lastHandledEvent ? lastHandledEvent.Next() : State.NextEventSequenceNumber; + State = State with + { + LastHandledEventSequenceNumber = newLastHandledEvent, + NextEventSequenceNumber = nextEventSequenceNumber + }; + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.Replay.cs b/Source/Kernel/Grains/Observation/Observer.Replay.cs new file mode 100644 index 000000000..6d3be295d --- /dev/null +++ b/Source/Kernel/Grains/Observation/Observer.Replay.cs @@ -0,0 +1,64 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Concepts.Jobs; +using Cratis.Chronicle.Concepts.Keys; +using Cratis.Chronicle.Concepts.Observation; +using Cratis.Chronicle.Grains.Observation.Jobs; +using Cratis.Chronicle.Grains.Observation.States; +namespace Cratis.Chronicle.Grains.Observation; + +public partial class Observer +{ + /// + public async Task Replay() + { + if (State.RunningState == ObserverRunningState.Active) + { + await TransitionTo(); + } + } + + /// + public Task ReplayPartition(Key partition) => ReplayPartitionTo(partition, EventSequenceNumber.Max); + + /// + public async Task ReplayPartitionTo(Key partition, EventSequenceNumber sequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.AttemptReplayPartition(partition, sequenceNumber); + await _jobsManager.Start( + JobId.New(), + new( + _observerKey, + _subscription, + partition, + EventSequenceNumber.First, + sequenceNumber, + State.EventTypes)); + + State.ReplayingPartitions.Add(partition); + await WriteStateAsync(); + } + + /// + public async Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await TransitionTo(); + } + + /// + public async Task PartitionReplayed(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) + { + using var scope = logger.BeginObserverScope(_observerId, _observerKey); + logger.FinishedReplayForPartition(partition); + State.ReplayingPartitions.Remove(partition); + HandleNewLastHandledEvent(lastHandledEventSequenceNumber); + await WriteStateAsync(); + await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); + } +} diff --git a/Source/Kernel/Grains/Observation/Observer.cs b/Source/Kernel/Grains/Observation/Observer.cs index 82ec2f58e..48066ef36 100644 --- a/Source/Kernel/Grains/Observation/Observer.cs +++ b/Source/Kernel/Grains/Observation/Observer.cs @@ -35,7 +35,7 @@ namespace Cratis.Chronicle.Grains.Observation; /// for the observer. /// for creating loggers. [StorageProvider(ProviderName = WellKnownGrainStorageProviders.Observers)] -public class Observer( +public partial class Observer( [PersistentState(nameof(FailedPartition), WellKnownGrainStorageProviders.FailedPartitions)] IPersistentState failures, IObserverServiceClient replayStateServiceClient, @@ -95,17 +95,6 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella public Task GetState() => Task.FromResult(State); #pragma warning restore CA1721 // Property names should not match get methods - /// - public Task SetHandledStats(EventSequenceNumber lastHandledEventSequenceNumber) - { - State = State with - { - LastHandledEventSequenceNumber = lastHandledEventSequenceNumber - }; - - return WriteStateAsync(); - } - /// public Task GetSubscription() => Task.FromResult(_subscription); @@ -127,7 +116,7 @@ public async Task Subscribe( logger.Subscribing(); - State = State with { Type = type }; + State = State with { Type = type, EventTypes = eventTypes }; _subscription = new ObserverSubscription( _observerId, @@ -136,7 +125,7 @@ public async Task Subscribe( typeof(TObserverSubscriber), siloAddress, subscriberArgs); - + await WriteStateAsync(); await ResumeJobs(); await TryRecoverAllFailedPartitions(); await TransitionTo(); @@ -182,308 +171,6 @@ public async Task Unsubscribe() await TransitionTo(); } - /// - public async Task Replay() - { - if (State.RunningState == ObserverRunningState.Active) - { - await TransitionTo(); - } - } - - /// - public Task ReplayPartition(Key partition) => ReplayPartitionTo(partition, EventSequenceNumber.Max); - - /// - public async Task ReplayPartitionTo(Key partition, EventSequenceNumber sequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.AttemptReplayPartition(partition, sequenceNumber); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - partition, - EventSequenceNumber.First, - sequenceNumber, - State.EventTypes)); - - State.ReplayingPartitions.Add(partition); - await WriteStateAsync(); - } - - /// - public async Task Replayed(EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await TransitionTo(); - } - - /// - public async Task PartitionReplayed(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FinishedReplayForPartition(partition); - State.ReplayingPartitions.Remove(partition); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task PartitionFailed( - Key partition, - EventSequenceNumber sequenceNumber, - IEnumerable exceptionMessages, - string exceptionStackTrace) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - _metrics?.PartitionFailed(partition); - logger.PartitionFailed(partition, sequenceNumber); - var failure = failures.State.RegisterAttempt(partition, sequenceNumber, exceptionMessages, exceptionStackTrace); - var config = await configurationProvider.GetFor(_observerKey); - if (config.MaxRetryAttempts == 0 || failure.Attempts.Count() <= config.MaxRetryAttempts) - { - await this.RegisterOrUpdateReminder(partition.ToString(), GetNextRetryDelay(failure, config), TimeSpan.FromHours(48)); - } - else - { - logger.GivingUpOnRecoveringFailedPartition(partition); - } - - await failures.WriteStateAsync(); - } - - /// - public async Task FailedPartitionRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FailingPartitionRecovered(partition); - failures.State.Remove(partition); - await failures.WriteStateAsync(); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task FailedPartitionPartiallyRecovered(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.FailingPartitionPartiallyRecovered(partition, lastHandledEventSequenceNumber); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - } - - /// - public async Task CatchUp() - { - _isPreparingCatchup = true; - using var scope = logger.BeginObserverScope(State.Id, _observerKey); - - var subscription = await GetSubscription(); - - var jobs = await _jobsManager.GetJobsOfType(); - var jobsForThisObserver = jobs.Where(IsJobForThisObserver); - if (jobs.Any(_ => _.Status == JobStatus.Running)) - { - logger.FinishingExistingCatchUpJob(); - return; - } - - var pausedJob = jobs.FirstOrDefault(_ => _.Status == JobStatus.Paused); - - if (pausedJob is not null) - { - logger.ResumingCatchUpJob(); - await _jobsManager.Resume(pausedJob.Id); - } - else - { - logger.StartCatchUpJob(State.NextEventSequenceNumber); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - subscription, - State.NextEventSequenceNumber, - State.EventTypes)); - } - } - - /// - public async Task RegisterCatchingUpPartitions(IEnumerable partitions) - { - using var scope = logger.BeginObserverScope(State.Id, _observerKey); - logger.RegisteringCatchingUpPartitions(); - foreach (var partition in partitions) - { - State.CatchingUpPartitions.Add(partition); - } - - await WriteStateAsync(); - - _isPreparingCatchup = false; - } - - /// - public async Task CaughtUp(EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await TransitionTo(); - } - - /// - public async Task PartitionCaughtUp(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.PartitionCaughtUp(partition, lastHandledEventSequenceNumber); - State.CatchingUpPartitions.Remove(partition); - HandleNewLastHandledEvent(lastHandledEventSequenceNumber); - await WriteStateAsync(); - await StartCatchupJobIfNeeded(partition, lastHandledEventSequenceNumber); - } - - /// - public async Task TryStartRecoverJobForFailedPartition(Key partition) - { - if (!Failures.TryGet(partition, out var failure)) - { - return; - } - - await StartRecoverJobForFailedPartition(failure); - } - - /// - public async Task TryRecoverAllFailedPartitions() - { - foreach (var partition in Failures.Partitions) - { - await StartRecoverJobForFailedPartition(partition); - } - } - - /// - public async Task Handle(Key partition, IEnumerable events) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - - if (!events.Any()) - { - return; - } - - if (!ShouldHandleEvent(partition)) - { - return; - } - - if (!events.Any(_ => _subscription.EventTypes.Contains(_.Metadata.Type))) - { - State = State with { NextEventSequenceNumber = events.Last().Metadata.SequenceNumber.Next() }; - await WriteStateAsync(); - return; - } - - var failed = false; - var exceptionMessages = Enumerable.Empty(); - var exceptionStackTrace = string.Empty; - var tailEventSequenceNumber = State.NextEventSequenceNumber; - - var eventsToHandle = events.Where(_ => _.Metadata.SequenceNumber >= tailEventSequenceNumber).ToArray(); - var numEventsSuccessfullyHandled = EventCount.Zero; - var stateChanged = false; - if (eventsToHandle.Length != 0) - { - using (new WriteSuspension(this)) - { - try - { - var key = new ObserverSubscriberKey( - _observerKey.ObserverId, - _observerKey.EventStore, - _observerKey.Namespace, - _observerKey.EventSequenceId, - partition, - _subscription.SiloAddress.ToParsableString()); - - var firstEvent = eventsToHandle[0]; - - var subscriber = (GrainFactory.GetGrain(_subscription.SubscriberType, key) as IObserverSubscriber)!; - tailEventSequenceNumber = firstEvent.Metadata.SequenceNumber; - var result = await subscriber.OnNext(partition, eventsToHandle, new(_subscription.Arguments)); - numEventsSuccessfullyHandled = result.HandledAnyEvents - ? eventsToHandle.Count(_ => _.Metadata.SequenceNumber <= result.LastSuccessfulObservation) - : EventCount.Zero; - - if (result.State == ObserverSubscriberState.Failed) - { - failed = true; - exceptionMessages = result.ExceptionMessages; - exceptionStackTrace = result.ExceptionStackTrace; - tailEventSequenceNumber = result.HandledAnyEvents - ? result.LastSuccessfulObservation - : firstEvent.Metadata.SequenceNumber; - } - else if (result.State == ObserverSubscriberState.Disconnected) - { - await Unsubscribe(); - stateChanged = true; - } - - if (numEventsSuccessfullyHandled > 0) - { - stateChanged = true; - State = State with { NextEventSequenceNumber = result.LastSuccessfulObservation.Next() }; - var previousLastHandled = State.LastHandledEventSequenceNumber; - var shouldSetLastHandled = - previousLastHandled == EventSequenceNumber.Unavailable || - previousLastHandled < result.LastSuccessfulObservation; - State = State with - { - LastHandledEventSequenceNumber = shouldSetLastHandled - ? result.LastSuccessfulObservation - : previousLastHandled, - }; - } - } - catch (Exception ex) - { - failed = true; - exceptionMessages = ex.GetAllMessages().ToArray(); - exceptionStackTrace = ex.StackTrace ?? string.Empty; - } - } - - try - { - if (failed) - { - await PartitionFailed(partition, tailEventSequenceNumber, exceptionMessages, exceptionStackTrace); - } - else - { - _metrics?.SuccessfulObservation(); - } - - if (stateChanged) - { - await WriteStateAsync(); - } - } - catch (Exception ex) - { - logger.ObserverFailedForUnknownReasonsAfterHandlingEvents(ex); - } - } - } - /// public async Task ReceiveReminder(string reminderName, TickStatus status) { @@ -527,110 +214,16 @@ protected override async Task WriteStateAsync() await base.WriteStateAsync(); } - static TimeSpan GetNextRetryDelay(FailedPartition failure, Observers config) - { - var time = TimeSpan.FromSeconds(config.BackoffDelay * Math.Pow(config.ExponentialBackoffDelayFactor, failure.Attempts.Count())); - var maxTime = TimeSpan.FromSeconds(config.MaximumBackoffDelay); - - if (time > maxTime) - { - return maxTime; - } - - if (time == TimeSpan.Zero) - { - return TimeSpan.FromSeconds(config.BackoffDelay); - } - - return time; - } - - bool ShouldHandleEvent(Key partition) - { - if (!_subscription.IsSubscribed) - { - logger.ObserverIsNotSubscribed(); - return false; - } - - if (Failures.IsFailed(partition)) - { - logger.PartitionIsFailed(partition); - return false; - } - - if (State.RunningState != ObserverRunningState.Active) - { - logger.ObserverIsNotActive(); - return false; - } - - if (_isPreparingCatchup) - { - logger.ObserverIsPreparingCatchup(); - return false; - } - - if (State.ReplayingPartitions.Contains(partition)) - { - logger.PartitionReplayingCannotHandleNewEvents(partition); - return false; - } - - if (State.CatchingUpPartitions.Contains(partition)) - { - logger.PartitionCatchingUpCannotHandleNewEvents(partition); - return false; - } - - return true; - } - async Task ResumeJobs() { + // TODO: Do this in a Task.WhenAll because JobsManager is reentrant now var unfilteredJobs = await _jobsManager.GetAllJobs(); - var jobs = unfilteredJobs.Where(_ => - _.Request is IObserverJobRequest && - _.IsResumable).ToArray(); - foreach (var job in jobs) + foreach (var job in unfilteredJobs.Where(job => job is { Request: IObserverJobRequest, IsResumable: true }).ToArray()) { await _jobsManager.Resume(job.Id); } } - void HandleNewLastHandledEvent(EventSequenceNumber lastHandledEvent) - { - if (!lastHandledEvent.IsActualValue) - { - logger.LastHandledEventIsNotActualValue(); - return; - } - - var newLastHandledEvent = State.LastHandledEventSequenceNumber == EventSequenceNumber.Unavailable || - State.LastHandledEventSequenceNumber < lastHandledEvent ? lastHandledEvent : State.LastHandledEventSequenceNumber; - var nextEventSequenceNumber = State.NextEventSequenceNumber <= lastHandledEvent ? lastHandledEvent.Next() : State.NextEventSequenceNumber; - State = State with - { - LastHandledEventSequenceNumber = newLastHandledEvent, - NextEventSequenceNumber = nextEventSequenceNumber - }; - } - - async Task StartRecoverJobForFailedPartition(FailedPartition failedPartition) - { - using var scope = logger.BeginObserverScope(_observerId, _observerKey); - logger.TryingToRecoverFailedPartition(failedPartition.Partition); - await RemoveReminder(failedPartition.Partition.ToString()); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - failedPartition.Partition, - failedPartition.LastAttempt.SequenceNumber, - State.EventTypes)); - } - async Task RemoveReminder(Key partition) { var reminder = await this.GetReminder(partition.ToString()); @@ -640,63 +233,6 @@ async Task RemoveReminder(Key partition) } } - async Task StartCatchupJobIfNeeded(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - if (failures.State.IsFailed(partition)) - { - logger.PartitionToCatchUpIsFailing(partition); - return; - } - if (!lastHandledEventSequenceNumber.IsActualValue) - { - logger.LastHandledEventIsNotActualValue(); - return; - } - var needCatchupResult = await NeedsCatchup(partition, lastHandledEventSequenceNumber); - await needCatchupResult.Match( - needCatchup => needCatchup - ? StartCatchupJob(partition, lastHandledEventSequenceNumber) - : Task.CompletedTask, - error => - { - switch (error) - { - case GetSequenceNumberError.NotFound: - logger.LastHandledEventForPartitionUnavailable(partition); - return Task.CompletedTask; - default: - return PartitionFailed(partition, lastHandledEventSequenceNumber.Next(), ["Event Sequence storage error caused partition to try recover"], string.Empty); - } - }); - } - - async Task StartCatchupJob(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - var nextEventSequenceNumber = lastHandledEventSequenceNumber.Next(); - logger.StartingCatchUpForPartition(partition, nextEventSequenceNumber); - State.CatchingUpPartitions.Add(partition); - await _jobsManager.Start( - JobId.New(), - new( - _observerKey, - _subscription, - partition, - nextEventSequenceNumber, - State.EventTypes)); - await WriteStateAsync(); - } - - async Task> NeedsCatchup(Key partition, EventSequenceNumber lastHandledEventSequenceNumber) - { - var nextSequenceNumber = await _eventSequence.GetNextSequenceNumberGreaterOrEqualTo(lastHandledEventSequenceNumber, State.EventTypes, partition); - return nextSequenceNumber.Match>( - number => number != lastHandledEventSequenceNumber, - error => error); - } - - bool IsJobForThisObserver(JobState jobState) => - ((ReplayObserverRequest)jobState.Request).ObserverKey == _observerKey; - class WriteSuspension : IDisposable { readonly Observer _observer; diff --git a/Source/Kernel/Grains/Observation/States/Routing.cs b/Source/Kernel/Grains/Observation/States/Routing.cs index 43789b1ca..389bc6cc9 100644 --- a/Source/Kernel/Grains/Observation/States/Routing.cs +++ b/Source/Kernel/Grains/Observation/States/Routing.cs @@ -56,15 +56,6 @@ public override async Task OnEnter(ObserverState state) return await EvaluateState(state); } - /// - public override Task OnLeave(ObserverState state) - { - return Task.FromResult(state with - { - EventTypes = _subscription.IsSubscribed ? _subscription.EventTypes : state.EventTypes - }); - } - async Task EvaluateState(ObserverState state) { if (!_subscription.IsSubscribed) diff --git a/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs b/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs index f53a12166..f074df061 100644 --- a/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs +++ b/Source/Kernel/Storage.MongoDB/Keys/ObserverKeysAsyncEnumerator.cs @@ -42,7 +42,7 @@ public async ValueTask MoveNextAsync() return false; } - _queue = new Queue(cursor.Current.Select(_ => new Key(_.Value, ArrayIndexers.NoIndexers))); + _queue = new(cursor.Current.Select(_ => new Key(_.Value, ArrayIndexers.NoIndexers))); } if (_queue.Count == 0)