Skip to content

Commit

Permalink
Improve ActivationMigrationManager shutdown resilience and responsive…
Browse files Browse the repository at this point in the history
…ness (#9229)
  • Loading branch information
ReubenBond authored Nov 14, 2024
1 parent 98e6142 commit 9a19013
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ internal class ActivationMigrationManager : SystemTarget, IActivationMigrationMa
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<MigrationWorkItem> WorkItemChannel)> _workers = new();
private readonly ObjectPool<MigrationWorkItem> _workItemPool = ObjectPool.Create(new MigrationWorkItem.ObjectPoolPolicy());
private readonly CancellationTokenSource _shuttingDownCts = new();
private readonly ILogger<ActivationMigrationManager> _logger;
private readonly IInternalGrainFactory _grainFactory;
private readonly Catalog _catalog;
Expand Down Expand Up @@ -214,7 +215,7 @@ private async Task PumpMigrationQueue(SiloAddress targetSilo, Channel<MigrationW
}

// Attempt to migrate the batch.
await remote.AcceptMigratingGrains(batch);
await remote.AcceptMigratingGrains(batch).AsTask().WaitAsync(_shuttingDownCts.Token);

foreach (var item in items)
{
Expand All @@ -228,7 +229,10 @@ private async Task PumpMigrationQueue(SiloAddress targetSilo, Channel<MigrationW
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while migrating {Count} grain activations to {SiloAddress}", items.Count, targetSilo);
if (!_shuttingDownCts.IsCancellationRequested)
{
_logger.LogError(exception, "Error while migrating {Count} grain activations to {SiloAddress}", items.Count, targetSilo);
}

foreach (var item in items)
{
Expand Down Expand Up @@ -316,7 +320,16 @@ private async Task StopAsync(CancellationToken cancellationToken)
workerTasks.Add(value.PumpTask);
}

await Task.WhenAll(workerTasks).WaitAsync(cancellationToken);
try
{
_shuttingDownCts.Cancel();
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Error signaling shutdown.");
}

await Task.WhenAll(workerTasks).WaitAsync(cancellationToken).SuppressThrowing();
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
Expand Down

0 comments on commit 9a19013

Please sign in to comment.