Skip to content

Commit

Permalink
Add pause/resume to projection coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy committed May 10, 2024
1 parent 92795c8 commit 6eed74f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;

Expand All @@ -9,6 +10,18 @@ public interface IProjectionCoordinator : IHostedService
// TODO -- add some convenience methods to get at various shards
IProjectionDaemon DaemonForMainDatabase();
ValueTask<IProjectionDaemon> DaemonForDatabase(string databaseIdentifier);

/// <summary>
/// Stops the projection coordinator's automatic restart logic and stops all running agents across all daemons. Does not release any held locks.
/// </summary>
/// <returns></returns>
Task PauseAsync();

/// <summary>
/// Resumes the projection coordinators automatic restart logic and starts all running agents across all daemons. Intended to be used after <see cref="PauseAsync"/>
/// </summary>
/// <returns></returns>
Task ResumeAsync();
}

public interface IProjectionCoordinator<T> : IProjectionCoordinator where T : IDocumentStore
Expand Down
17 changes: 13 additions & 4 deletions src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ProjectionCoordinator : IProjectionCoordinator
private readonly object _daemonLock = new ();

private readonly ResiliencePipeline _resilience;
private readonly CancellationTokenSource _cancellation = new();
private CancellationTokenSource _cancellation;
private Task _runner;
private readonly TimeProvider _timeProvider;

Expand Down Expand Up @@ -100,18 +100,21 @@ internal record DaemonShardName(IProjectionDaemon Daemon, ShardName Name);

public Task StartAsync(CancellationToken cancellationToken)
{
_cancellation = new();
_runner = Task.Run(() => executeAsync(_cancellation.Token), _cancellation.Token);

return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
public async Task PauseAsync()
{
_logger.LogInformation("Pausing ProjectionCoordinator");
#if NET8_0_OR_GREATER
await _cancellation.CancelAsync().ConfigureAwait(false);
#else
_cancellation.Cancel();
#endif
_cancellation.SafeDispose();

try
{
Expand All @@ -129,9 +132,8 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
catch (Exception e)
{
_logger.LogError(e, "Error while trying to shut down the ProjectionCoordinator");
_logger.LogError(e, "Error while trying to stop the ProjectionCoordinator");
}
_runner.SafeDispose();

foreach (var pair in _daemons.Enumerate())
{
Expand All @@ -144,6 +146,13 @@ public async Task StopAsync(CancellationToken cancellationToken)
_logger.LogError(exception, "Error while trying to stop daemon agents in database {Name}", pair.Key);
}
}
}

public Task ResumeAsync() => StartAsync(default);

public async Task StopAsync(CancellationToken cancellationToken)
{
await PauseAsync().ConfigureAwait(false);

foreach (var daemon in _daemons.Enumerate())
{
Expand Down

0 comments on commit 6eed74f

Please sign in to comment.