From 4c43edc79a8d0f71c420745956fdfe147ac73055 Mon Sep 17 00:00:00 2001 From: Alex Valuyskiy Date: Wed, 8 Feb 2017 18:07:59 +0200 Subject: [PATCH] Send terminationMessage to singleton when leaving last (#2503) * send terminationMessage to singleton when leaving last * fixed API Approve --- .../ClusterSingletonManagerLeaveSpec.cs | 137 +++++++++++++----- .../Singleton/ClusterSingletonManager.cs | 65 ++++++++- ...reAPISpec.ApproveClusterTools.approved.txt | 3 +- 3 files changed, 161 insertions(+), 44 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaveSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaveSpec.cs index 5fecca684b4..6581f798205 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaveSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaveSpec.cs @@ -18,9 +18,9 @@ namespace Akka.Cluster.Tools.Tests.MultiNode.Singleton { public class ClusterSingletonManagerLeaveSpecConfig : MultiNodeConfig { - public readonly RoleName First; - public readonly RoleName Second; - public readonly RoleName Third; + public RoleName First { get; } + public RoleName Second { get; } + public RoleName Third { get; } public ClusterSingletonManagerLeaveSpecConfig() { @@ -29,7 +29,7 @@ public ClusterSingletonManagerLeaveSpecConfig() Third = Role("third"); CommonConfig = ConfigurationFactory.ParseString(@" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = off @@ -38,34 +38,49 @@ public ClusterSingletonManagerLeaveSpecConfig() .WithFallback(ClusterSingletonProxy.DefaultConfig()) .WithFallback(MultiNodeClusterSpec.ClusterConfig()); } - } - public class ClusterSingletonManagerLeaveNode1 : ClusterSingletonManagerLeaveSpec { } - public class ClusterSingletonManagerLeaveNode2 : ClusterSingletonManagerLeaveSpec { } - public class ClusterSingletonManagerLeaveNode3 : ClusterSingletonManagerLeaveSpec { } - - public abstract class ClusterSingletonManagerLeaveSpec : MultiNodeClusterSpec - { - private readonly ClusterSingletonManagerLeaveSpecConfig _config; - - private class Echo : ReceiveActor + // The singleton actor + public class Echo : ReceiveActor { private readonly IActorRef _testActorRef; public Echo(IActorRef testActorRef) { _testActorRef = testActorRef; + + Receive(x => x.Equals("stop"), _ => + { + _testActorRef.Tell("stop"); + Context.Stop(Self); + }); + ReceiveAny(x => Sender.Tell(Self)); } + protected override void PreStart() + { + _testActorRef.Tell("preStart"); + } + protected override void PostStop() { - _testActorRef.Tell("stopped"); + _testActorRef.Tell("postStop"); } } + } + + public class ClusterSingletonManagerLeaveNode1 : ClusterSingletonManagerLeaveSpec { } + public class ClusterSingletonManagerLeaveNode2 : ClusterSingletonManagerLeaveSpec { } + public class ClusterSingletonManagerLeaveNode3 : ClusterSingletonManagerLeaveSpec { } + + public abstract class ClusterSingletonManagerLeaveSpec : MultiNodeClusterSpec + { + private readonly ClusterSingletonManagerLeaveSpecConfig _config; private readonly Lazy _echoProxy; + protected override int InitialParticipantsValueFactory => Roles.Count; + protected ClusterSingletonManagerLeaveSpec() : this(new ClusterSingletonManagerLeaveSpecConfig()) { } @@ -92,8 +107,8 @@ private void Join(RoleName from, RoleName to) private IActorRef CreateSingleton() { return Sys.ActorOf(ClusterSingletonManager.Props( - singletonProps: Props.Create(() => new Echo(TestActor)), - terminationMessage: PoisonPill.Instance, + singletonProps: Props.Create(() => new ClusterSingletonManagerLeaveSpecConfig.Echo(TestActor)), + terminationMessage: "stop", settings: ClusterSingletonManagerSettings.Create(Sys)), name: "echo"); } @@ -106,52 +121,98 @@ public void ClusterSingletonManagerLeaveSpecs() public void Leaving_ClusterSingletonManager_must_handover_to_new_instance() { - var first = _config.First; - var second = _config.Second; - var third = _config.Third; + Join(_config.First, _config.First); - Join(first, first); RunOn(() => { - _echoProxy.Value.Tell("hello1"); - ExpectMsg(TimeSpan.FromSeconds(5)); - }, first); + Within(5.Seconds(), () => + { + ExpectMsg("preStart"); + _echoProxy.Value.Tell("hello"); + ExpectMsg(); + }); + }, _config.First); EnterBarrier("first-active"); - Join(second, first); - Join(third, first); - Within(TimeSpan.FromSeconds(10), () => + Join(_config.Second, _config.First); + RunOn(() => + { + Within(10.Seconds(), () => + { + AwaitAssert(() => Cluster.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(2)); + }); + }, _config.First, _config.Second); + EnterBarrier("second-up"); + + Join(_config.Third, _config.First); + Within(10.Seconds(), () => { - AwaitAssert(() => Cluster.ReadView.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(3)); + AwaitAssert(() => Cluster.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(3)); }); EnterBarrier("all-up"); RunOn(() => { - Cluster.Leave(Node(first).Address); - }, second); + Cluster.Leave(Node(_config.First).Address); + }, _config.Second); RunOn(() => { - ExpectMsg("stopped", TimeSpan.FromSeconds(10)); - }, first); + ExpectMsg("stop", 10.Seconds()); + ExpectMsg("postStop"); + }, _config.First); EnterBarrier("first-stopped"); + RunOn(() => + { + ExpectMsg("preStart"); + }, _config.Second); + EnterBarrier("second-started"); + RunOn(() => { var p = CreateTestProbe(); - var firstAddress = Node(first).Address; - p.Within(TimeSpan.FromSeconds(10), () => + var firstAddress = Node(_config.First).Address; + p.Within(10.Seconds(), () => { p.AwaitAssert(() => { _echoProxy.Value.Tell("hello2", p.Ref); - var actualAddress = p.ExpectMsg(TestKitSettings.DefaultTimeout); - actualAddress.Path.Address.Should().NotBe(firstAddress); + p.ExpectMsg(1.Seconds()).Path.Address.Should().NotBe(firstAddress); }); }); - }, second, third); - EnterBarrier("handover-done"); + }, _config.Second, _config.Third); + EnterBarrier("second-working"); + + RunOn(() => + { + Cluster.Leave(Node(_config.Second).Address); + }, _config.Third); + + RunOn(() => + { + ExpectMsg("stop", 15.Seconds()); + ExpectMsg("postStop"); + }, _config.Second); + EnterBarrier("second-stopped"); + + RunOn(() => + { + ExpectMsg("preStart"); + }, _config.Third); + EnterBarrier("third-started"); + + RunOn(() => + { + Cluster.Leave(Node(_config.Third).Address); + }, _config.Third); + + RunOn(() => + { + ExpectMsg("stop", 10.Seconds()); + ExpectMsg("postStop"); + }, _config.Third); + EnterBarrier("third-stopped"); } } } \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index 8e3d498d3b0..6caeebded3e 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -288,6 +288,27 @@ public HandingOverData(IActorRef singleton, IActorRef handOverTo) } } + /// + /// TBD + /// + [Serializable] + internal sealed class StoppingData : IClusterSingletonData + { + /// + /// TBD + /// + public readonly IActorRef Singleton; + + /// + /// TBD + /// + /// TBD + public StoppingData(IActorRef singleton) + { + Singleton = singleton; + } + } + /// /// TBD /// @@ -365,6 +386,10 @@ public enum ClusterSingletonState /// /// TBD /// + Stopping, + /// + /// TBD + /// End } @@ -615,6 +640,11 @@ private State GoToHandingOver(IAct return GoTo(ClusterSingletonState.HandingOver).Using(new HandingOverData(singleton, handOverTo)); } + private State GoToStopping(IActorRef singleton) + { + singleton.Tell(_terminationMessage); + return GoTo(ClusterSingletonState.Stopping).Using(new StoppingData(singleton)); + } private void InitializeFSM() { @@ -848,8 +878,21 @@ private void InitializeFSM() if (e.FsmEvent is TakeOverRetry && wasOldestData != null) { var takeOverRetry = (TakeOverRetry)e.FsmEvent; - if (takeOverRetry.Count <= _maxTakeOverRetries) + + if (_cluster.IsTerminated + && (wasOldestData.NewOldest == null || takeOverRetry.Count > _maxTakeOverRetries)) { + if (wasOldestData.SingletonTerminated) + { + return Stop(); + } + else + { + return GoToStopping(wasOldestData.Singleton); + } + } + else if (takeOverRetry.Count <= _maxTakeOverRetries) + { Log.Info("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest); if (wasOldestData.NewOldest != null) @@ -858,12 +901,10 @@ private void InitializeFSM() SetTimer(TakeOverRetryTimer, new TakeOverRetry(takeOverRetry.Count + 1), _settings.HandOverRetryInterval); return Stay(); } - else if (_cluster.IsTerminated) - { - return Stop(); - } else + { throw new ClusterSingletonManagerIsStuck(string.Format("Expected hand-over to [{0}] never occured", wasOldestData.NewOldest)); + } } else if (e.FsmEvent is HandOverToMe && wasOldestData != null) { @@ -911,6 +952,20 @@ private void InitializeFSM() return null; }); + When(ClusterSingletonState.Stopping, e => + { + var terminated = e.FsmEvent as Terminated; + var stoppingData = e.StateData as StoppingData; + if (terminated != null + && stoppingData != null + && terminated.ActorRef.Equals(stoppingData.Singleton)) + { + return Stop(); + } + + return null; + }); + When(ClusterSingletonState.End, e => { var removed = e.FsmEvent as ClusterEvent.MemberRemoved; diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt index f0754aa9035..9a0ab30dd61 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterTools.approved.txt @@ -1026,7 +1026,8 @@ namespace Akka.Cluster.Tools.Singleton WasOldest = 4, HandingOver = 5, TakeOver = 6, - End = 7, + Stopping = 7, + End = 8, } public interface IClusterSingletonData { } public interface IClusterSingletonMessage { }