diff --git a/src/contrib/cluster/Akka.DistributedData.Tests.MultiNode/DurablePruningSpec.cs b/src/contrib/cluster/Akka.DistributedData.Tests.MultiNode/DurablePruningSpec.cs index 32612323ba8..be36d749aad 100644 --- a/src/contrib/cluster/Akka.DistributedData.Tests.MultiNode/DurablePruningSpec.cs +++ b/src/contrib/cluster/Akka.DistributedData.Tests.MultiNode/DurablePruningSpec.cs @@ -7,7 +7,9 @@ using System; using System.Collections.Immutable; +using System.Linq; using Akka.Actor; +using Akka.Cluster; using Akka.Cluster.TestKit; using Akka.Configuration; using Akka.Remote.TestKit; @@ -48,20 +50,20 @@ public class DurablePruningSpec : MultiNodeClusterSpec private readonly GCounterKey keyA = new GCounterKey("A"); private readonly IActorRef replicator; - protected DurablePruningSpec() : this(new DurablePruningSpecConfig()) + public DurablePruningSpec() : this(new DurablePruningSpecConfig()) { } protected DurablePruningSpec(DurablePruningSpecConfig config) : base(config, typeof(DurablePruningSpec)) { - InitialParticipantsValueFactory = Roles.Count; cluster = Akka.Cluster.Cluster.Get(Sys); + replicator = StartReplicator(Sys); timeout = Dilated(TimeSpan.FromSeconds(5)); } - protected override int InitialParticipantsValueFactory { get; } + protected override int InitialParticipantsValueFactory => Roles.Count; - [MultiNodeFact(Skip = "FIXME")] + [MultiNodeFact] public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() { Join(first, first); @@ -69,10 +71,20 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() var sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); var cluster2 = Akka.Cluster.Cluster.Get(sys2); + var distributedData2 = DistributedData.Get(sys2); var replicator2 = StartReplicator(sys2); var probe2 = new TestProbe(sys2, new XunitAssertions()); cluster2.Join(Node(first).Address); + AwaitAssert(() => + { + cluster.State.Members.Count.ShouldBe(4); + cluster.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBe(true); + cluster2.State.Members.Count.ShouldBe(4); + cluster2.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBe(true); + }, TimeSpan.FromSeconds(10)); + EnterBarrier("joined"); + Within(TimeSpan.FromSeconds(5), () => AwaitAssert(() => { replicator.Tell(Dsl.GetReplicaCount); @@ -81,10 +93,10 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() probe2.ExpectMsg(new ReplicaCount(4)); })); - replicator.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster))); + replicator.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster, 3))); ExpectMsg(new UpdateSuccess(keyA, null)); - replicator2.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2, 2)), probe2.Ref); + replicator2.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2.SelfUniqueAddress, 2)), probe2.Ref); probe2.ExpectMsg(new UpdateSuccess(keyA, null)); EnterBarrier("updates-done"); @@ -135,8 +147,9 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() RunOn(() => { - var addr = cluster2.SelfAddress; - var sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString(@" + var address = cluster2.SelfAddress; + var sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString($@" + akka.remote.dot-netty.tcp.port = {address.Port} ").WithFallback(Sys.Settings.Config)); var cluster3 = Akka.Cluster.Cluster.Get(sys3); var replicator3 = StartReplicator(sys3); @@ -151,13 +164,20 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() replicator3.Tell(Dsl.Get(keyA, ReadLocal.Instance), probe3.Ref); var counter4 = probe3.ExpectMsg().Get(keyA); var value = counter4.Value; - values.Add((int) value); + values = values.Add((int) value); value.ShouldBe(10UL); counter4.State.Count.ShouldBe(3); }); values.ShouldBe(ImmutableHashSet.Create(10)); }); + // all must at least have seen it as joining + AwaitAssert(() => + { + cluster3.State.Members.Count.ShouldBe(4); + cluster3.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBeTrue(); + }, TimeSpan.FromSeconds(10)); + // after merging with others replicator3.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault))); var counter5 = ExpectMsg().Get(keyA); @@ -165,6 +185,7 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node() counter5.State.Count.ShouldBe(3); }, first); + EnterBarrier("sys3-started"); replicator.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault)));