From 2dc33e5c235611867672da214d36a38d86e9d17d Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sun, 4 Feb 2018 08:04:57 +0100 Subject: [PATCH] fixed ClusterDomainEventPublisherSpec --- .../MultiNodeClusterSpec.cs | 2 +- .../ClusterDomainEventPublisherSpec.cs | 125 ++++++++++++------ .../ClusterMessageSerializerSpec.cs | 2 +- 3 files changed, 84 insertions(+), 45 deletions(-) diff --git a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs index 92322105468..e5e002c6f65 100644 --- a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs +++ b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs @@ -199,7 +199,7 @@ void MuteLog(ActorSystem sys = null) typeof(GossipEnvelope), typeof(GossipStatus), typeof(GossipStatus), - typeof(InternalClusterAction.ITick), + typeof(InternalClusterAction.Tick), typeof(PoisonPill), typeof(DeathWatchNotification), typeof(Disassociated), diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs index ee5cf50d1ec..853293dd840 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs @@ -11,6 +11,7 @@ using Akka.TestKit; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Cluster.Tests { @@ -20,32 +21,51 @@ public class ClusterDomainEventPublisherSpec : AkkaSpec akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" akka.remote.dot-netty.tcp.port = 0"; - readonly IActorRef _publisher; - static readonly Member aUp = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up); - static readonly Member aLeaving = aUp.Copy(MemberStatus.Leaving); - static readonly Member aExiting = aLeaving.Copy(MemberStatus.Exiting); - static readonly Member aRemoved = aExiting.Copy(MemberStatus.Removed); - static readonly Member bExiting = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Exiting); - static readonly Member bRemoved = bExiting.Copy(MemberStatus.Removed); - static readonly Member cJoining = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Joining, ImmutableHashSet.Create("GRP")); - static readonly Member cUp = cJoining.Copy(MemberStatus.Up); - static readonly Member cRemoved = cUp.Copy(MemberStatus.Removed); - static readonly Member a51Up = TestMember.Create(new Address("akk.tcp", "sys", "a", 2551), MemberStatus.Up); - static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP")); - - static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress); - static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress); - static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); - static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); - static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); - static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); - static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress); - static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress); - static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress); - - readonly TestProbe _memberSubscriber; - - public ClusterDomainEventPublisherSpec() : base(Config) + private const string OtherDataCenter = "dc2"; + + private readonly IActorRef _publisher; + private static readonly Member aUp = TestMember.Create(new Address("akka.tcp", "sys", "a", 2552), MemberStatus.Up); + private static readonly Member aLeaving = aUp.Copy(MemberStatus.Leaving); + private static readonly Member aExiting = aLeaving.Copy(MemberStatus.Exiting); + private static readonly Member aRemoved = aExiting.Copy(MemberStatus.Removed); + private static readonly Member bExiting = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Exiting); + private static readonly Member bRemoved = bExiting.Copy(MemberStatus.Removed); + private static readonly Member cJoining = TestMember.Create(new Address("akka.tcp", "sys", "c", 2552), MemberStatus.Joining, ImmutableHashSet.Create("GRP")); + private static readonly Member cUp = cJoining.Copy(MemberStatus.Up); + private static readonly Member cRemoved = cUp.Copy(MemberStatus.Removed); + private static readonly Member a51Up = TestMember.Create(new Address("akk.tcp", "sys", "a", 2551), MemberStatus.Up); + private static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP")); + private static readonly Member eUp = TestMember.Create(new Address("akka.tcp", "sys", "e", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP"), OtherDataCenter); + + private static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress); + private static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress); + private static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); + private static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); + private static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); + private static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); + private static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress); + private static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress); + private static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress); + private static readonly Gossip g9 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp, eUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, eUp.UniqueAddress))); + private static readonly Gossip g10 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp, eUp), new GossipOverview(Reachability.Empty)); + + private static readonly MembershipState state0 = State(g0, aUp.UniqueAddress); + private static readonly MembershipState state1 = State(g1, aUp.UniqueAddress); + private static readonly MembershipState state2 = State(g2, aUp.UniqueAddress); + private static readonly MembershipState state3 = State(g3, aUp.UniqueAddress); + private static readonly MembershipState state4 = State(g4, aUp.UniqueAddress); + private static readonly MembershipState state5 = State(g5, aUp.UniqueAddress); + private static readonly MembershipState state6 = State(g6, aUp.UniqueAddress); + private static readonly MembershipState state7 = State(g7, aUp.UniqueAddress); + private static readonly MembershipState state8 = State(g8, aUp.UniqueAddress); + private static readonly MembershipState state9 = State(g9, aUp.UniqueAddress); + private static readonly MembershipState state10 = State(g10, aUp.UniqueAddress); + + private static readonly MembershipState emptyMembershipState = State(Gossip.Empty, aUp.UniqueAddress); + + private readonly TestProbe _memberSubscriber; + + public ClusterDomainEventPublisherSpec(ITestOutputHelper output) : base(Config, output: output) { _memberSubscriber = CreateTestProbe(); Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.IMemberEvent)); @@ -53,23 +73,26 @@ public ClusterDomainEventPublisherSpec() : base(Config) Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.ClusterShuttingDown)); _publisher = Sys.ActorOf(Props.Create()); - _publisher.Tell(new InternalClusterAction.PublishChanges(g0)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state0)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(aUp)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(aUp.Address)); } + + private static MembershipState State(Gossip gossip, UniqueAddress self, string dc = ClusterSettings.DefaultDataCenter) => + new MembershipState(gossip, self, dc, crossDataCenterConnections: 5); [Fact] public void ClusterDomainEventPublisher_must_publish_MemberJoined() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g1)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state1)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining)); } [Fact] public void ClusterDomainEventPublisher_must_publish_MemberUp() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g2)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); } @@ -77,7 +100,7 @@ public void ClusterDomainEventPublisher_must_publish_MemberUp() [Fact] public void ClusterDomainEventPublisher_must_publish_leader_changed() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g4)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); @@ -88,17 +111,17 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed() [Fact] public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g6)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state6)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g7)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state7)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address)); _memberSubscriber.ExpectNoMsg(500.Milliseconds()); // at the removed member a an empty gossip is the last thing - _publisher.Tell(new InternalClusterAction.PublishChanges(Gossip.Empty)); + _publisher.Tell(new InternalClusterAction.PublishChanges(emptyMembershipState)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up)); @@ -108,13 +131,13 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_lea [Fact] public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g4)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g5)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state5)); _memberSubscriber.ExpectNoMsg(500.Milliseconds()); } @@ -124,9 +147,9 @@ public void ClusterDomainEventPublisher_must_publish_role_leader_changed() var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged)))); subscriber.ExpectMsg(); - _publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)))); + _publisher.Tell(new InternalClusterAction.PublishChanges(State(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress))); subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address)); - _publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cUp, dUp)))); + _publisher.Tell(new InternalClusterAction.PublishChanges(State(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress))); subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address)); } @@ -144,7 +167,7 @@ public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subsc public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe() { var subscriber = CreateTestProbe(); - _publisher.Tell(new InternalClusterAction.PublishChanges(g8)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state8)); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent)))); subscriber.ReceiveN(4).Should().BeEquivalentTo( @@ -157,6 +180,22 @@ public void ClusterDomainEventPublisher_must_send_events_corresponding_to_curren subscriber.ExpectNoMsg(500.Milliseconds()); } + [Fact] + public void ClusterDomainEventPublisher_should_send_datacenter_reachability_events() + { + var subscriber = CreateTestProbe(); + _publisher.Tell(new InternalClusterAction.PublishChanges(state9)); + _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.DataCenterReachabilityEvent)))); + + subscriber.ExpectMsg(new ClusterEvent.UnreachableDataCenter(OtherDataCenter)); + subscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + + _publisher.Tell(new InternalClusterAction.PublishChanges(state10)); + + subscriber.ExpectMsg(new ClusterEvent.ReachableDataCenter(OtherDataCenter)); + subscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + [Fact] public void ClusterDomainEventPublisher_should_support_unsubscribe() { @@ -164,7 +203,7 @@ public void ClusterDomainEventPublisher_should_support_unsubscribe() _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent)))); subscriber.ExpectMsg(); _publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent))); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); subscriber.ExpectNoMsg(500.Milliseconds()); // but memberSubscriber is still subscriber _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); @@ -177,10 +216,10 @@ public void ClusterDomainEventPublisher_must_publish_seen_changed() var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged)))); subscriber.ExpectMsg(); - _publisher.Tell(new InternalClusterAction.PublishChanges(g2)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); subscriber.ExpectMsg(); subscriber.ExpectNoMsg(500.Milliseconds()); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); subscriber.ExpectMsg(); subscriber.ExpectNoMsg(500.Milliseconds()); } diff --git a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs index c86a4088901..6e97e84e3c7 100644 --- a/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs +++ b/src/core/Akka.Cluster.Tests/Serialization/ClusterMessageSerializerSpec.cs @@ -145,7 +145,7 @@ public void Can_serialize_Down() [Fact] public void Can_serialize_InitJoin() { - var message = new InternalClusterAction.InitJoin(); + var message = InternalClusterAction.InitJoin.Instance; AssertEqual(message); }