Skip to content

Commit

Permalink
Send terminationMessage to singleton when leaving last (#2503)
Browse files Browse the repository at this point in the history
* send terminationMessage to singleton when leaving last

* fixed API Approve
  • Loading branch information
alexvaluyskiy authored and Aaronontheweb committed Feb 8, 2017
1 parent c95b2d2 commit 4c43edc
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ namespace Akka.Cluster.Tools.Tests.MultiNode.Singleton
{
public class ClusterSingletonManagerLeaveSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
public readonly RoleName Second;
public readonly RoleName Third;
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }

public ClusterSingletonManagerLeaveSpecConfig()
{
Expand All @@ -29,7 +29,7 @@ public ClusterSingletonManagerLeaveSpecConfig()
Third = Role("third");

CommonConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.loglevel = INFO
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = off
Expand All @@ -38,34 +38,49 @@ public ClusterSingletonManagerLeaveSpecConfig()
.WithFallback(ClusterSingletonProxy.DefaultConfig())
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
}

public class ClusterSingletonManagerLeaveNode1 : ClusterSingletonManagerLeaveSpec { }
public class ClusterSingletonManagerLeaveNode2 : ClusterSingletonManagerLeaveSpec { }
public class ClusterSingletonManagerLeaveNode3 : ClusterSingletonManagerLeaveSpec { }

public abstract class ClusterSingletonManagerLeaveSpec : MultiNodeClusterSpec
{
private readonly ClusterSingletonManagerLeaveSpecConfig _config;

private class Echo : ReceiveActor
// The singleton actor
public class Echo : ReceiveActor
{
private readonly IActorRef _testActorRef;

public Echo(IActorRef testActorRef)
{
_testActorRef = testActorRef;

Receive<string>(x => x.Equals("stop"), _ =>
{
_testActorRef.Tell("stop");
Context.Stop(Self);
});

ReceiveAny(x => Sender.Tell(Self));
}

protected override void PreStart()
{
_testActorRef.Tell("preStart");
}

protected override void PostStop()
{
_testActorRef.Tell("stopped");
_testActorRef.Tell("postStop");
}
}
}

public class ClusterSingletonManagerLeaveNode1 : ClusterSingletonManagerLeaveSpec { }
public class ClusterSingletonManagerLeaveNode2 : ClusterSingletonManagerLeaveSpec { }
public class ClusterSingletonManagerLeaveNode3 : ClusterSingletonManagerLeaveSpec { }

public abstract class ClusterSingletonManagerLeaveSpec : MultiNodeClusterSpec
{
private readonly ClusterSingletonManagerLeaveSpecConfig _config;

private readonly Lazy<IActorRef> _echoProxy;

protected override int InitialParticipantsValueFactory => Roles.Count;

protected ClusterSingletonManagerLeaveSpec() : this(new ClusterSingletonManagerLeaveSpecConfig())
{
}
Expand All @@ -92,8 +107,8 @@ private void Join(RoleName from, RoleName to)
private IActorRef CreateSingleton()
{
return Sys.ActorOf(ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new Echo(TestActor)),
terminationMessage: PoisonPill.Instance,
singletonProps: Props.Create(() => new ClusterSingletonManagerLeaveSpecConfig.Echo(TestActor)),
terminationMessage: "stop",
settings: ClusterSingletonManagerSettings.Create(Sys)),
name: "echo");
}
Expand All @@ -106,52 +121,98 @@ public void ClusterSingletonManagerLeaveSpecs()

public void Leaving_ClusterSingletonManager_must_handover_to_new_instance()
{
var first = _config.First;
var second = _config.Second;
var third = _config.Third;
Join(_config.First, _config.First);

Join(first, first);
RunOn(() =>
{
_echoProxy.Value.Tell("hello1");
ExpectMsg<IActorRef>(TimeSpan.FromSeconds(5));
}, first);
Within(5.Seconds(), () =>
{
ExpectMsg("preStart");
_echoProxy.Value.Tell("hello");
ExpectMsg<IActorRef>();
});
}, _config.First);
EnterBarrier("first-active");

Join(second, first);
Join(third, first);
Within(TimeSpan.FromSeconds(10), () =>
Join(_config.Second, _config.First);
RunOn(() =>
{
Within(10.Seconds(), () =>
{
AwaitAssert(() => Cluster.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(2));
});
}, _config.First, _config.Second);
EnterBarrier("second-up");

Join(_config.Third, _config.First);
Within(10.Seconds(), () =>
{
AwaitAssert(() => Cluster.ReadView.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(3));
AwaitAssert(() => Cluster.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(3));
});
EnterBarrier("all-up");

RunOn(() =>
{
Cluster.Leave(Node(first).Address);
}, second);
Cluster.Leave(Node(_config.First).Address);
}, _config.Second);

RunOn(() =>
{
ExpectMsg("stopped", TimeSpan.FromSeconds(10));
}, first);
ExpectMsg("stop", 10.Seconds());
ExpectMsg("postStop");
}, _config.First);
EnterBarrier("first-stopped");

RunOn(() =>
{
ExpectMsg("preStart");
}, _config.Second);
EnterBarrier("second-started");

RunOn(() =>
{
var p = CreateTestProbe();
var firstAddress = Node(first).Address;
p.Within(TimeSpan.FromSeconds(10), () =>
var firstAddress = Node(_config.First).Address;
p.Within(10.Seconds(), () =>
{
p.AwaitAssert(() =>
{
_echoProxy.Value.Tell("hello2", p.Ref);
var actualAddress = p.ExpectMsg<IActorRef>(TestKitSettings.DefaultTimeout);
actualAddress.Path.Address.Should().NotBe(firstAddress);
p.ExpectMsg<IActorRef>(1.Seconds()).Path.Address.Should().NotBe(firstAddress);
});
});
}, second, third);
EnterBarrier("handover-done");
}, _config.Second, _config.Third);
EnterBarrier("second-working");

RunOn(() =>
{
Cluster.Leave(Node(_config.Second).Address);
}, _config.Third);

RunOn(() =>
{
ExpectMsg("stop", 15.Seconds());
ExpectMsg("postStop");
}, _config.Second);
EnterBarrier("second-stopped");

RunOn(() =>
{
ExpectMsg("preStart");
}, _config.Third);
EnterBarrier("third-started");

RunOn(() =>
{
Cluster.Leave(Node(_config.Third).Address);
}, _config.Third);

RunOn(() =>
{
ExpectMsg("stop", 10.Seconds());
ExpectMsg("postStop");
}, _config.Third);
EnterBarrier("third-stopped");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,27 @@ public HandingOverData(IActorRef singleton, IActorRef handOverTo)
}
}

/// <summary>
/// TBD
/// </summary>
[Serializable]
internal sealed class StoppingData : IClusterSingletonData
{
/// <summary>
/// TBD
/// </summary>
public readonly IActorRef Singleton;

/// <summary>
/// TBD
/// </summary>
/// <param name="singleton">TBD</param>
public StoppingData(IActorRef singleton)
{
Singleton = singleton;
}
}

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -365,6 +386,10 @@ public enum ClusterSingletonState
/// <summary>
/// TBD
/// </summary>
Stopping,
/// <summary>
/// TBD
/// </summary>
End
}

Expand Down Expand Up @@ -615,6 +640,11 @@ private State<ClusterSingletonState, IClusterSingletonData> GoToHandingOver(IAct
return GoTo(ClusterSingletonState.HandingOver).Using(new HandingOverData(singleton, handOverTo));
}

private State<ClusterSingletonState, IClusterSingletonData> GoToStopping(IActorRef singleton)
{
singleton.Tell(_terminationMessage);
return GoTo(ClusterSingletonState.Stopping).Using(new StoppingData(singleton));
}

private void InitializeFSM()
{
Expand Down Expand Up @@ -848,8 +878,21 @@ private void InitializeFSM()
if (e.FsmEvent is TakeOverRetry && wasOldestData != null)
{
var takeOverRetry = (TakeOverRetry)e.FsmEvent;
if (takeOverRetry.Count <= _maxTakeOverRetries)

if (_cluster.IsTerminated
&& (wasOldestData.NewOldest == null || takeOverRetry.Count > _maxTakeOverRetries))
{
if (wasOldestData.SingletonTerminated)
{
return Stop();
}
else
{
return GoToStopping(wasOldestData.Singleton);
}
}
else if (takeOverRetry.Count <= _maxTakeOverRetries)
{
Log.Info("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest);

if (wasOldestData.NewOldest != null)
Expand All @@ -858,12 +901,10 @@ private void InitializeFSM()
SetTimer(TakeOverRetryTimer, new TakeOverRetry(takeOverRetry.Count + 1), _settings.HandOverRetryInterval);
return Stay();
}
else if (_cluster.IsTerminated)
{
return Stop();
}
else
{
throw new ClusterSingletonManagerIsStuck(string.Format("Expected hand-over to [{0}] never occured", wasOldestData.NewOldest));
}
}
else if (e.FsmEvent is HandOverToMe && wasOldestData != null)
{
Expand Down Expand Up @@ -911,6 +952,20 @@ private void InitializeFSM()
return null;
});

When(ClusterSingletonState.Stopping, e =>
{
var terminated = e.FsmEvent as Terminated;
var stoppingData = e.StateData as StoppingData;
if (terminated != null
&& stoppingData != null
&& terminated.ActorRef.Equals(stoppingData.Singleton))
{
return Stop();
}

return null;
});

When(ClusterSingletonState.End, e =>
{
var removed = e.FsmEvent as ClusterEvent.MemberRemoved;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,8 @@ namespace Akka.Cluster.Tools.Singleton
WasOldest = 4,
HandingOver = 5,
TakeOver = 6,
End = 7,
Stopping = 7,
End = 8,
}
public interface IClusterSingletonData { }
public interface IClusterSingletonMessage { }
Expand Down

0 comments on commit 4c43edc

Please sign in to comment.