From 0445c6887e36fafcd4fc6f8083721758d2307058 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Tue, 6 Sep 2022 22:32:51 +0200 Subject: [PATCH 1/4] Added single node 'cluster' provider, allowing Proto.Cluster to be used in single applications with the cluster / virtual actor abstractions. This makes it easy to get started, and allows for the same programming model in a single service use case as in a clustered environment. Updated cluster tests to allow testing single node setups. --- .../SingleNode/SingleNodeActivatorActor.cs | 215 ++++++++++++++++++ .../SingleNode/SingleNodeLookup.cs | 99 ++++++++ .../SingleNode/SingleNodeProvider.cs | 42 ++++ tests/Proto.Cluster.Tests/ClusterFixture.cs | 31 ++- tests/Proto.Cluster.Tests/ClusterTests.cs | 33 +++ 5 files changed, 411 insertions(+), 9 deletions(-) create mode 100644 src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs create mode 100644 src/Proto.Cluster/SingleNode/SingleNodeLookup.cs create mode 100644 src/Proto.Cluster/SingleNode/SingleNodeProvider.cs diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs new file mode 100644 index 0000000000..56bc9482f4 --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -0,0 +1,215 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Proto.Cluster.SingleNode; + +class SingleNodeActivatorActor : IActor +{ + private static readonly ILogger Logger = Log.CreateLogger(); + + + private readonly Cluster _cluster; + private readonly Dictionary _actors = new(); + private readonly HashSet _inFlightIdentityChecks = new(); + + public SingleNodeActivatorActor(Cluster cluster) => _cluster = cluster; + + private Task OnStarted(IContext context) + { + var self = context.Self; + _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + + return Task.CompletedTask; + } + + public Task ReceiveAsync(IContext context) => + context.Message switch + { + Started => OnStarted(context), + Stopping => OnStopping(context), + ActivationRequest msg => OnActivationRequest(msg, context), + ActivationTerminated msg => OnActivationTerminated(msg), + ActivationTerminating msg => OnActivationTerminating(msg), + _ => Task.CompletedTask + }; + + private async Task OnStopping(IContext context) + { + await StopActors(context); + _cluster.PidCache.RemoveByPredicate(kv => kv.Value.Address.Equals(context.System.Address, StringComparison.Ordinal)); + } + + private async Task StopActors(IContext context) + { + var stopping = new List(); + + var clusterIdentities = _actors.Keys.ToList(); + + foreach (var ci in clusterIdentities) + { + var pid = _actors[ci]; + var stoppingTask = context.PoisonAsync(pid); + stopping.Add(stoppingTask); + _actors.Remove(ci); + } + + //await graceful shutdown of all actors we no longer own + await Task.WhenAll(stopping); + Logger.LogInformation("[SingleNodeActivator] - Stopped {ActorCount} actors", clusterIdentities.Count); + } + + private Task OnActivationTerminated(ActivationTerminated msg) + { + _cluster.PidCache.RemoveByVal(msg.ClusterIdentity, msg.Pid); + + // we get this via broadcast to all nodes, remove if we have it, or ignore + if (Logger.IsEnabled(LogLevel.Trace)) + Logger.LogTrace("[SingleNodeActivator] Terminated {Pid}", msg.Pid); + + return Task.CompletedTask; + } + + private Task OnActivationTerminating(ActivationTerminating msg) + { + // ActivationTerminating is sent to the local EventStream when a + // local cluster actor stops. + + if (!_actors.ContainsKey(msg.ClusterIdentity)) + return Task.CompletedTask; + + if (Logger.IsEnabled(LogLevel.Trace)) + Logger.LogTrace("[SingleNodeActivator] Terminating {Pid}", msg.Pid); + + _actors.Remove(msg.ClusterIdentity); + + // Broadcast ActivationTerminated to all nodes so that PidCaches gets + // cleared correctly. + var activationTerminated = new ActivationTerminated + { + Pid = msg.Pid, + ClusterIdentity = msg.ClusterIdentity, + }; + _cluster.MemberList.BroadcastEvent(activationTerminated); + + return Task.CompletedTask; + } + + private Task OnActivationRequest(ActivationRequest msg, IContext context) + { + if (_actors.TryGetValue(msg.ClusterIdentity, out var existing)) + { + context.Respond(new ActivationResponse + { + Pid = existing, + } + ); + } + else + { + var clusterKind = _cluster.GetClusterKind(msg.Kind); + + if (clusterKind.CanSpawnIdentity is not null) + { + // Needs to check if the identity is allowed to spawn + VerifyAndSpawn(msg, context, clusterKind); + } + else + { + Spawn(msg, context, clusterKind); + } + } + + return Task.CompletedTask; + } + + private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind) + { + var clusterIdentity = msg.ClusterIdentity; + + if (_inFlightIdentityChecks.Contains(clusterIdentity)) + { + Logger.LogError("[SingleNodeActivator] Duplicate activation requests for {ClusterIdentity}", clusterIdentity); + context.Respond(new ActivationResponse + { + Failed = true, + } + ); + } + + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + + if (canSpawn.IsCompleted) + { + OnSpawnDecided(msg, context, clusterKind, canSpawn.Result); + return; + } + + _inFlightIdentityChecks.Add(clusterIdentity); + context.ReenterAfter(canSpawn.AsTask(), task => { + _inFlightIdentityChecks.Remove(clusterIdentity); + + if (task.IsCompletedSuccessfully) + { + OnSpawnDecided(msg, context, clusterKind, task.Result); + } + else + { + Logger.LogError("[SingleNodeActivator] Error when checking {ClusterIdentity}", clusterIdentity); + context.Respond(new ActivationResponse + { + Failed = true, + } + ); + } + + return Task.CompletedTask; + } + ); + } + + private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind) + { + try + { + var pid = context.Spawn(clusterKind.Props, ctx => ctx.Set(msg.ClusterIdentity)); + _actors.Add(msg.ClusterIdentity, pid); + context.Respond(new ActivationResponse + { + Pid = pid, + } + ); + } + catch (Exception e) + { + e.CheckFailFast(); + Logger.LogError(e, "[SingleNodeActivator] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity); + context.Respond(new ActivationResponse {Failed = true}); + } + } + + private void OnSpawnDecided(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind, bool canSpawnIdentity) + { + if (canSpawnIdentity) + { + Spawn(msg, context, clusterKind); + } + else + { + context.Respond(new ActivationResponse + { + Failed = true, + InvalidIdentity = true + } + ); + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs new file mode 100644 index 0000000000..158847a656 --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs @@ -0,0 +1,99 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Proto.Cluster.Identity; + +namespace Proto.Cluster.SingleNode; + +/// +/// Provides a lookup optimized for single node 'clusters' +/// Only usable with SingleNodeProvider +/// +public class SingleNodeLookup : IIdentityLookup +{ + private const string ActivatorActorName = "$sn-activator"; + private PID _activatorActor = null!; + + private static readonly ILogger Logger = Log.CreateLogger(); + private readonly TimeSpan _getPidTimeout; + private Cluster _cluster = null!; + + public SingleNodeLookup() : this(TimeSpan.FromSeconds(1)) + { + } + + public SingleNodeLookup(TimeSpan getPidTimeout) => _getPidTimeout = getPidTimeout; + + public async Task GetAsync(ClusterIdentity clusterIdentity, CancellationToken notUsed) + { + using var cts = new CancellationTokenSource(_getPidTimeout); + + var req = new ActivationRequest + { + RequestId = Guid.NewGuid().ToString("N"), + ClusterIdentity = clusterIdentity + }; + + try + { + var resp = await _cluster.System.Root.RequestAsync(_activatorActor, req, cts.Token); + + if (resp.InvalidIdentity) + { + throw new IdentityIsBlocked(clusterIdentity); + } + + return resp?.Pid; + } + catch (DeadLetterException) + { + Logger.LogInformation("[SingleNodeActivator] Remote PID request deadletter {@Request}", req); + return null; + } + catch (TimeoutException) + { + Logger.LogInformation("[SingleNodeActivator] Remote PID request timeout {@Request}", req); + return null; + } + catch (Exception e) when (e is not IdentityIsBlocked) + { + e.CheckFailFast(); + Logger.LogError(e, "[SingleNodeActivator] Error occured requesting remote PID {@Request}", req); + return null; + } + } + + public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, CancellationToken ct) + { + var activationTerminated = new ActivationTerminated + { + Pid = pid, + ClusterIdentity = clusterIdentity + }; + + _cluster.MemberList.BroadcastEvent(activationTerminated); + + return Task.CompletedTask; + } + + public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient) + { + if (cluster.Provider is not SingleNodeProvider || isClient) + { + throw new ArgumentException("SingleNodeLookup can only be used with SingleNodeProvider in server mode"); + } + + _cluster = cluster; + var props = Props.FromProducer(() => new SingleNodeActivatorActor(_cluster)); + _activatorActor = cluster.System.Root.SpawnNamedSystem(props, ActivatorActorName); + return Task.CompletedTask; + } + + public Task ShutdownAsync() => _cluster.System.Root.StopAsync(_activatorActor); +} \ No newline at end of file diff --git a/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs b/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs new file mode 100644 index 0000000000..74495990bf --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs @@ -0,0 +1,42 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading.Tasks; + +namespace Proto.Cluster.SingleNode; + +/// +/// Provides an in-memory cluster provider for a single node +/// Makes the cluster abstractions available for +/// single node scenarios, with zero need for external coordination +/// +public class SingleNodeProvider : IClusterProvider +{ + private Cluster? _cluster; + + public Task StartMemberAsync(Cluster cluster) + { + _cluster = cluster; + var (host, port) = cluster.System.GetAddress(); + var member = new Member + { + Host = host, + Port = port, + Id = cluster.System.Id, + Kinds = {cluster.GetClusterKinds()} + }; + cluster.MemberList.UpdateClusterTopology(new[] {member}); + return Task.CompletedTask; + } + + public Task StartClientAsync(Cluster cluster) => throw new NotSupportedException("Single node provider does not support client mode"); + + public Task ShutdownAsync(bool graceful) + { + _cluster?.MemberList.UpdateClusterTopology(Array.Empty()); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index 80cefd3b3e..3da8befb88 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -11,6 +11,7 @@ using Proto.Cluster.Identity; using Proto.Cluster.Partition; using Proto.Cluster.PartitionActivator; +using Proto.Cluster.SingleNode; using Proto.Cluster.Testing; using Proto.Logging; using Proto.OpenTelemetry; @@ -29,6 +30,7 @@ public interface IClusterFixture public Task SpawnNode(); LogStore LogStore { get; } + int ClusterSize { get; } Task RemoveNode(Cluster member, bool graceful = true); } @@ -39,7 +41,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi public const string InvalidIdentity = "invalid"; protected readonly string ClusterName; - private readonly int _clusterSize; + public int ClusterSize { get; } private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly TracerProvider? _tracerProvider; @@ -50,7 +52,7 @@ protected ClusterFixture(int clusterSize, Func? co #if NETCOREAPP3_1 AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); #endif - _clusterSize = clusterSize; + ClusterSize = clusterSize; _configure = configure; ClusterName = $"test-cluster-{Guid.NewGuid().ToString().Substring(0, 6)}"; @@ -89,7 +91,7 @@ private static TracerProvider InitOpenTelemetryTracing() => global::OpenTelemetr public async Task InitializeAsync() { - var nodes = await SpawnClusterNodes(_clusterSize, _configure).ConfigureAwait(false); + var nodes = await SpawnClusterNodes(ClusterSize, _configure).ConfigureAwait(false); _members.AddRange(nodes); } @@ -178,9 +180,11 @@ protected virtual ActorSystemConfig GetActorSystemConfig() var actorSystemConfig = ActorSystemConfig.Setup(); // ReSharper disable once HeuristicUnreachableCode - return EnableTracing ? actorSystemConfig - .WithConfigureProps(props => props.WithTracing()) - .WithConfigureRootContext(context => context.WithTracing()): actorSystemConfig; + return EnableTracing + ? actorSystemConfig + .WithConfigureProps(props => props.WithTracing()) + .WithConfigureRootContext(context => context.WithTracing()) + : actorSystemConfig; } protected abstract IClusterProvider GetClusterProvider(); @@ -216,7 +220,6 @@ protected BaseInMemoryClusterFixture(int clusterSize, Func new TestProvider(new TestProviderOptions(), InMemAgent); } -// ReSharper disable once ClassNeverInstantiated.Global public class InMemoryClusterFixture : BaseInMemoryClusterFixture { public InMemoryClusterFixture() : base(3, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) @@ -224,7 +227,6 @@ public InMemoryClusterFixture() : base(3, config => config.WithActorRequestTimeo } } -// ReSharper disable once ClassNeverInstantiated.Global public class InMemoryClusterFixtureWithPartitionActivator : BaseInMemoryClusterFixture { public InMemoryClusterFixtureWithPartitionActivator() : base(3, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) @@ -271,4 +273,15 @@ protected override async Task SpawnClusterMember(Func config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) + { + } + + protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider(); + + protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup(); +} diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index 516f002143..f3cfc21e06 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -74,6 +74,12 @@ public async Task SupportsMessageEnvelopeResponses() [Fact] public async Task StateIsReplicatedAcrossCluster() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var sourceMember = Members.First(); var sourceMemberId = sourceMember.System.Id; var targetMember = Members.Last(); @@ -115,6 +121,12 @@ IAsyncEnumerable SubscribeToGossipUpdates(Cluster member) [Fact] public async Task ReSpawnsClusterActorsFromDifferentNodes() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var timeout = new CancellationTokenSource(10000).Token; var id = CreateIdentity("1"); await PingPong(Members[0], id, timeout); @@ -140,6 +152,12 @@ public async Task ReSpawnsClusterActorsFromDifferentNodes() [Fact] public async Task HandlesLosingANode() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var ids = Enumerable.Range(1, 10).Select(id => id.ToString()).ToList(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -158,6 +176,12 @@ public async Task HandlesLosingANode() [Fact] public async Task HandlesLosingANodeWhileProcessing() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var ingressNodes = new[] {Members[0], Members[1]}; var victim = Members[2]; var ids = Enumerable.Range(1, 20).Select(id => id.ToString()).ToList(); @@ -382,4 +406,13 @@ public InMemoryPartitionActivatorClusterTests(ITestOutputHelper testOutputHelper : base(testOutputHelper, clusterFixture) { } +} + +public class SingleNodeProviderClusterTests : ClusterTests, IClassFixture +{ + // ReSharper disable once SuggestBaseTypeForParameterInConstructor + public SingleNodeProviderClusterTests(ITestOutputHelper testOutputHelper, SingleNodeProviderFixture clusterFixture) + : base(testOutputHelper, clusterFixture) + { + } } \ No newline at end of file From 11321e75067ab91ccfacd2c3057729f3c38d7b6b Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Tue, 6 Sep 2022 22:58:23 +0200 Subject: [PATCH 2/4] . --- src/Proto.Cluster/SingleNode/SingleNodeLookup.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs index 158847a656..badfef10ef 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs @@ -1,5 +1,5 @@ // ----------------------------------------------------------------------- -// +// // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- From 1254773f26116d838414641468860fd8ea2825ea Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 7 Sep 2022 08:24:39 +0200 Subject: [PATCH 3/4] Log naming --- .../SingleNode/SingleNodeActivatorActor.cs | 12 ++++++------ src/Proto.Cluster/SingleNode/SingleNodeLookup.cs | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs index 56bc9482f4..65f78b39eb 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -64,7 +64,7 @@ private async Task StopActors(IContext context) //await graceful shutdown of all actors we no longer own await Task.WhenAll(stopping); - Logger.LogInformation("[SingleNodeActivator] - Stopped {ActorCount} actors", clusterIdentities.Count); + Logger.LogInformation("[SingleNode] - Stopped {ActorCount} actors", clusterIdentities.Count); } private Task OnActivationTerminated(ActivationTerminated msg) @@ -73,7 +73,7 @@ private Task OnActivationTerminated(ActivationTerminated msg) // we get this via broadcast to all nodes, remove if we have it, or ignore if (Logger.IsEnabled(LogLevel.Trace)) - Logger.LogTrace("[SingleNodeActivator] Terminated {Pid}", msg.Pid); + Logger.LogTrace("[SingleNode] Terminated {Pid}", msg.Pid); return Task.CompletedTask; } @@ -87,7 +87,7 @@ private Task OnActivationTerminating(ActivationTerminating msg) return Task.CompletedTask; if (Logger.IsEnabled(LogLevel.Trace)) - Logger.LogTrace("[SingleNodeActivator] Terminating {Pid}", msg.Pid); + Logger.LogTrace("[SingleNode] Terminating {Pid}", msg.Pid); _actors.Remove(msg.ClusterIdentity); @@ -137,7 +137,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl if (_inFlightIdentityChecks.Contains(clusterIdentity)) { - Logger.LogError("[SingleNodeActivator] Duplicate activation requests for {ClusterIdentity}", clusterIdentity); + Logger.LogError("[SingleNode] Duplicate activation requests for {ClusterIdentity}", clusterIdentity); context.Respond(new ActivationResponse { Failed = true, @@ -163,7 +163,7 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl } else { - Logger.LogError("[SingleNodeActivator] Error when checking {ClusterIdentity}", clusterIdentity); + Logger.LogError("[SingleNode] Error when checking {ClusterIdentity}", clusterIdentity); context.Respond(new ActivationResponse { Failed = true, @@ -191,7 +191,7 @@ private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind catch (Exception e) { e.CheckFailFast(); - Logger.LogError(e, "[SingleNodeActivator] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity); + Logger.LogError(e, "[SingleNode] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity); context.Respond(new ActivationResponse {Failed = true}); } } diff --git a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs index badfef10ef..6f00664dc7 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs @@ -53,18 +53,18 @@ public SingleNodeLookup() : this(TimeSpan.FromSeconds(1)) } catch (DeadLetterException) { - Logger.LogInformation("[SingleNodeActivator] Remote PID request deadletter {@Request}", req); + Logger.LogInformation("[SingleNode] Remote PID request deadletter {@Request}", req); return null; } catch (TimeoutException) { - Logger.LogInformation("[SingleNodeActivator] Remote PID request timeout {@Request}", req); + Logger.LogInformation("[SingleNode] Remote PID request timeout {@Request}", req); return null; } catch (Exception e) when (e is not IdentityIsBlocked) { e.CheckFailFast(); - Logger.LogError(e, "[SingleNodeActivator] Error occured requesting remote PID {@Request}", req); + Logger.LogError(e, "[SingleNode] Error occured requesting remote PID {@Request}", req); return null; } } From e3a79440b4bf6c2a8eacf40aee399e385d140dd9 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Wed, 7 Sep 2022 16:10:41 +0200 Subject: [PATCH 4/4] Cleanup / refactoring --- src/Proto.Cluster/Partition/PartitionIdentityActor.cs | 2 +- .../PartitionActivator/PartitionActivatorActor.cs | 4 ++-- src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs | 9 ++++----- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs index e41d8e92a4..c7a8247166 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs @@ -199,7 +199,7 @@ private HashSet GetIncompletePartitionAddresses(HandoverSink sink, strin private Task OnStarted(IContext context) { var self = context.Self; - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); return Task.CompletedTask; } diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 3dc9c5a600..5688133421 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -41,8 +41,8 @@ public PartitionActivatorActor(Cluster cluster, PartitionActivatorManager manage private Task OnStarted(IContext context) { var self = context.Self; - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); return Task.CompletedTask; } diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs index 65f78b39eb..628c94bdeb 100644 --- a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -25,8 +25,8 @@ class SingleNodeActivatorActor : IActor private Task OnStarted(IContext context) { var self = context.Self; - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(context.System.Root, self); + _cluster.System.EventStream.Subscribe(context.System.Root, self); return Task.CompletedTask; } @@ -62,7 +62,7 @@ private async Task StopActors(IContext context) _actors.Remove(ci); } - //await graceful shutdown of all actors we no longer own + //await graceful shutdown of all actors await Task.WhenAll(stopping); Logger.LogInformation("[SingleNode] - Stopped {ActorCount} actors", clusterIdentities.Count); } @@ -70,8 +70,7 @@ private async Task StopActors(IContext context) private Task OnActivationTerminated(ActivationTerminated msg) { _cluster.PidCache.RemoveByVal(msg.ClusterIdentity, msg.Pid); - - // we get this via broadcast to all nodes, remove if we have it, or ignore + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace("[SingleNode] Terminated {Pid}", msg.Pid);