From 6eed74f327ee853dbc04f39a3517a6fd87c431af Mon Sep 17 00:00:00 2001 From: JT Date: Fri, 10 May 2024 19:45:46 +0800 Subject: [PATCH] Add pause/resume to projection coordinator --- .../Coordination/IProjectionCoordinator.cs | 13 +++++++++++++ .../Coordination/ProjectionCoordinator.cs | 17 +++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs index 4f73222f17..58b2bc40ed 100644 --- a/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; @@ -9,6 +10,18 @@ public interface IProjectionCoordinator : IHostedService // TODO -- add some convenience methods to get at various shards IProjectionDaemon DaemonForMainDatabase(); ValueTask DaemonForDatabase(string databaseIdentifier); + + /// + /// Stops the projection coordinator's automatic restart logic and stops all running agents across all daemons. Does not release any held locks. + /// + /// + Task PauseAsync(); + + /// + /// Resumes the projection coordinators automatic restart logic and starts all running agents across all daemons. Intended to be used after + /// + /// + Task ResumeAsync(); } public interface IProjectionCoordinator : IProjectionCoordinator where T : IDocumentStore diff --git a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs index dd5be6ee48..96e1eb6e91 100644 --- a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs @@ -27,7 +27,7 @@ public class ProjectionCoordinator : IProjectionCoordinator private readonly object _daemonLock = new (); private readonly ResiliencePipeline _resilience; - private readonly CancellationTokenSource _cancellation = new(); + private CancellationTokenSource _cancellation; private Task _runner; private readonly TimeProvider _timeProvider; @@ -100,18 +100,21 @@ internal record DaemonShardName(IProjectionDaemon Daemon, ShardName Name); public Task StartAsync(CancellationToken cancellationToken) { + _cancellation = new(); _runner = Task.Run(() => executeAsync(_cancellation.Token), _cancellation.Token); return Task.CompletedTask; } - public async Task StopAsync(CancellationToken cancellationToken) + public async Task PauseAsync() { + _logger.LogInformation("Pausing ProjectionCoordinator"); #if NET8_0_OR_GREATER await _cancellation.CancelAsync().ConfigureAwait(false); #else _cancellation.Cancel(); #endif + _cancellation.SafeDispose(); try { @@ -129,9 +132,8 @@ public async Task StopAsync(CancellationToken cancellationToken) } catch (Exception e) { - _logger.LogError(e, "Error while trying to shut down the ProjectionCoordinator"); + _logger.LogError(e, "Error while trying to stop the ProjectionCoordinator"); } - _runner.SafeDispose(); foreach (var pair in _daemons.Enumerate()) { @@ -144,6 +146,13 @@ public async Task StopAsync(CancellationToken cancellationToken) _logger.LogError(exception, "Error while trying to stop daemon agents in database {Name}", pair.Key); } } + } + + public Task ResumeAsync() => StartAsync(default); + + public async Task StopAsync(CancellationToken cancellationToken) + { + await PauseAsync().ConfigureAwait(false); foreach (var daemon in _daemons.Enumerate()) {