Skip to content

Commit

Permalink
fixed ClusterDomainEventPublisherSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Feb 4, 2018
1 parent 8b97e9d commit 2dc33e5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void MuteLog(ActorSystem sys = null)
typeof(GossipEnvelope),
typeof(GossipStatus),
typeof(GossipStatus),
typeof(InternalClusterAction.ITick),
typeof(InternalClusterAction.Tick),
typeof(PoisonPill),
typeof(DeathWatchNotification),
typeof(Disassociated),
Expand Down
125 changes: 82 additions & 43 deletions src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand All @@ -20,64 +21,86 @@ public class ClusterDomainEventPublisherSpec : AkkaSpec
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.remote.dot-netty.tcp.port = 0";

readonly IActorRef _publisher;
static readonly Member aUp = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up);
static readonly Member aLeaving = aUp.Copy(MemberStatus.Leaving);
static readonly Member aExiting = aLeaving.Copy(MemberStatus.Exiting);
static readonly Member aRemoved = aExiting.Copy(MemberStatus.Removed);
static readonly Member bExiting = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Exiting);
static readonly Member bRemoved = bExiting.Copy(MemberStatus.Removed);
static readonly Member cJoining = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Joining, ImmutableHashSet.Create("GRP"));
static readonly Member cUp = cJoining.Copy(MemberStatus.Up);
static readonly Member cRemoved = cUp.Copy(MemberStatus.Removed);
static readonly Member a51Up = TestMember.Create(new Address("akk.tcp", "sys", "a", 2551), MemberStatus.Up);
static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP"));

static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress);
static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress);
static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress);
static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress);
static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress);

readonly TestProbe _memberSubscriber;

public ClusterDomainEventPublisherSpec() : base(Config)
private const string OtherDataCenter = "dc2";

private readonly IActorRef _publisher;
private static readonly Member aUp = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up);
private static readonly Member aLeaving = aUp.Copy(MemberStatus.Leaving);
private static readonly Member aExiting = aLeaving.Copy(MemberStatus.Exiting);
private static readonly Member aRemoved = aExiting.Copy(MemberStatus.Removed);
private static readonly Member bExiting = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Exiting);
private static readonly Member bRemoved = bExiting.Copy(MemberStatus.Removed);
private static readonly Member cJoining = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Joining, ImmutableHashSet.Create("GRP"));
private static readonly Member cUp = cJoining.Copy(MemberStatus.Up);
private static readonly Member cRemoved = cUp.Copy(MemberStatus.Removed);
private static readonly Member a51Up = TestMember.Create(new Address("akk.tcp", "sys", "a", 2551), MemberStatus.Up);
private static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP"));
private static readonly Member eUp = TestMember.Create(new Address("akka.tcp", "sys", "e", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP"), OtherDataCenter);

private static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress);
private static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress);
private static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
private static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
private static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress);
private static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress);
private static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress);
private static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress);
private static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress);
private static readonly Gossip g9 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp, eUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, eUp.UniqueAddress)));
private static readonly Gossip g10 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp, eUp), new GossipOverview(Reachability.Empty));

private static readonly MembershipState state0 = State(g0, aUp.UniqueAddress);
private static readonly MembershipState state1 = State(g1, aUp.UniqueAddress);
private static readonly MembershipState state2 = State(g2, aUp.UniqueAddress);
private static readonly MembershipState state3 = State(g3, aUp.UniqueAddress);
private static readonly MembershipState state4 = State(g4, aUp.UniqueAddress);
private static readonly MembershipState state5 = State(g5, aUp.UniqueAddress);
private static readonly MembershipState state6 = State(g6, aUp.UniqueAddress);
private static readonly MembershipState state7 = State(g7, aUp.UniqueAddress);
private static readonly MembershipState state8 = State(g8, aUp.UniqueAddress);
private static readonly MembershipState state9 = State(g9, aUp.UniqueAddress);
private static readonly MembershipState state10 = State(g10, aUp.UniqueAddress);

private static readonly MembershipState emptyMembershipState = State(Gossip.Empty, aUp.UniqueAddress);

private readonly TestProbe _memberSubscriber;

public ClusterDomainEventPublisherSpec(ITestOutputHelper output) : base(Config, output: output)
{
_memberSubscriber = CreateTestProbe();
Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.IMemberEvent));
Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.LeaderChanged));
Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.ClusterShuttingDown));

_publisher = Sys.ActorOf(Props.Create<ClusterDomainEventPublisher>());
_publisher.Tell(new InternalClusterAction.PublishChanges(g0));
_publisher.Tell(new InternalClusterAction.PublishChanges(state0));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(aUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(aUp.Address));
}

private static MembershipState State(Gossip gossip, UniqueAddress self, string dc = ClusterSettings.DefaultDataCenter) =>
new MembershipState(gossip, self, dc, crossDataCenterConnections: 5);

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberJoined()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(g1));
_publisher.Tell(new InternalClusterAction.PublishChanges(state1));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_MemberUp()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(g2));
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
}

[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(g4));
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
Expand All @@ -88,17 +111,17 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed()
[Fact]
public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_publisher.Tell(new InternalClusterAction.PublishChanges(g6));
_publisher.Tell(new InternalClusterAction.PublishChanges(state6));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving));
_publisher.Tell(new InternalClusterAction.PublishChanges(g7));
_publisher.Tell(new InternalClusterAction.PublishChanges(state7));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
// at the removed member a an empty gossip is the last thing
_publisher.Tell(new InternalClusterAction.PublishChanges(Gossip.Empty));
_publisher.Tell(new InternalClusterAction.PublishChanges(emptyMembershipState));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up));
Expand All @@ -108,13 +131,13 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_lea
[Fact]
public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader()
{
_publisher.Tell(new InternalClusterAction.PublishChanges(g4));
_publisher.Tell(new InternalClusterAction.PublishChanges(state4));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp));
_memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address));

_publisher.Tell(new InternalClusterAction.PublishChanges(g5));
_publisher.Tell(new InternalClusterAction.PublishChanges(state5));
_memberSubscriber.ExpectNoMsg(500.Milliseconds());
}

Expand All @@ -124,9 +147,9 @@ public void ClusterDomainEventPublisher_must_publish_role_leader_changed()
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cJoining, dUp))));
_publisher.Tell(new InternalClusterAction.PublishChanges(State(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address));
_publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cUp, dUp))));
_publisher.Tell(new InternalClusterAction.PublishChanges(State(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress)));
subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address));
}

Expand All @@ -144,7 +167,7 @@ public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subsc
public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.PublishChanges(g8));
_publisher.Tell(new InternalClusterAction.PublishChanges(state8));
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent))));

subscriber.ReceiveN(4).Should().BeEquivalentTo(
Expand All @@ -157,14 +180,30 @@ public void ClusterDomainEventPublisher_must_send_events_corresponding_to_curren
subscriber.ExpectNoMsg(500.Milliseconds());
}

[Fact]
public void ClusterDomainEventPublisher_should_send_datacenter_reachability_events()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.PublishChanges(state9));
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.DataCenterReachabilityEvent))));

subscriber.ExpectMsg(new ClusterEvent.UnreachableDataCenter(OtherDataCenter));
subscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

_publisher.Tell(new InternalClusterAction.PublishChanges(state10));

subscriber.ExpectMsg(new ClusterEvent.ReachableDataCenter(OtherDataCenter));
subscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}

[Fact]
public void ClusterDomainEventPublisher_should_support_unsubscribe()
{
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent)));
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectNoMsg(500.Milliseconds());
// but memberSubscriber is still subscriber
_memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting));
Expand All @@ -177,10 +216,10 @@ public void ClusterDomainEventPublisher_must_publish_seen_changed()
var subscriber = CreateTestProbe();
_publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged))));
subscriber.ExpectMsg<ClusterEvent.CurrentClusterState>();
_publisher.Tell(new InternalClusterAction.PublishChanges(g2));
_publisher.Tell(new InternalClusterAction.PublishChanges(state2));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
_publisher.Tell(new InternalClusterAction.PublishChanges(g3));
_publisher.Tell(new InternalClusterAction.PublishChanges(state3));
subscriber.ExpectMsg<ClusterEvent.SeenChanged>();
subscriber.ExpectNoMsg(500.Milliseconds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void Can_serialize_Down()
[Fact]
public void Can_serialize_InitJoin()
{
var message = new InternalClusterAction.InitJoin();
var message = InternalClusterAction.InitJoin.Instance;
AssertEqual(message);
}

Expand Down

0 comments on commit 2dc33e5

Please sign in to comment.