Skip to content

Commit

Permalink
close akkadotnet#3077 - enable DDataDurablePruningSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Feb 10, 2020
1 parent 290d6d7 commit 82c1633
Showing 1 changed file with 42 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public DurablePruningSpecConfig()

CommonConfig = DebugConfig(on: false).WithFallback(ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.actor.provider = ""cluster""
akka.actor.provider = cluster
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = [""*""]
akka.cluster.distributed-data.durable.lmdb {
Expand All @@ -40,90 +40,91 @@ public DurablePruningSpecConfig()

public class DurablePruningSpec : MultiNodeClusterSpec
{
private readonly Cluster.Cluster cluster;
private readonly RoleName first = new RoleName("first");
private readonly RoleName second = new RoleName("second");
private readonly TimeSpan maxPruningDissemination = TimeSpan.FromSeconds(3);
private readonly TimeSpan timeout;
private readonly GCounterKey keyA = new GCounterKey("A");
private readonly IActorRef replicator;

protected DurablePruningSpec() : this(new DurablePruningSpecConfig())
private readonly Cluster.Cluster _cluster;
private readonly RoleName _first = new RoleName("first");
private readonly RoleName _second = new RoleName("second");
private readonly TimeSpan _maxPruningDissemination = TimeSpan.FromSeconds(3);
private readonly GCounterKey _keyA = new GCounterKey("A");
private readonly TimeSpan _timeout;
private readonly IActorRef _replicator;

protected DurablePruningSpec(IActorRef replicator) : this(new DurablePruningSpecConfig(), replicator)
{
}

protected DurablePruningSpec(DurablePruningSpecConfig config) : base(config, typeof(DurablePruningSpec))
protected DurablePruningSpec(DurablePruningSpecConfig config, IActorRef replicator) : base(config, typeof(DurablePruningSpec))
{
this._replicator = replicator;
InitialParticipantsValueFactory = Roles.Count;
cluster = Akka.Cluster.Cluster.Get(Sys);
timeout = Dilated(TimeSpan.FromSeconds(5));
_cluster = Akka.Cluster.Cluster.Get(Sys);
_timeout = Dilated(TimeSpan.FromSeconds(5));
}

protected override int InitialParticipantsValueFactory { get; }

[MultiNodeFact(Skip = "FIXME")]
[MultiNodeFact()]
public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
{
Join(first, first);
Join(second, first);
Join(_first, _first);
Join(_second, _first);

var sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
var cluster2 = Akka.Cluster.Cluster.Get(sys2);
var replicator2 = StartReplicator(sys2);
var probe2 = new TestProbe(sys2, new XunitAssertions());
cluster2.Join(Node(first).Address);
cluster2.Join(Node(_first).Address);

Within(TimeSpan.FromSeconds(5), () => AwaitAssert(() =>
{
replicator.Tell(Dsl.GetReplicaCount);
_replicator.Tell(Dsl.GetReplicaCount);
ExpectMsg(new ReplicaCount(4));
replicator2.Tell(Dsl.GetReplicaCount, probe2.Ref);
probe2.ExpectMsg(new ReplicaCount(4));
}));

replicator.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster)));
ExpectMsg(new UpdateSuccess(keyA, null));
_replicator.Tell(Dsl.Update(_keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(_cluster)));
ExpectMsg(new UpdateSuccess(_keyA, null));

replicator2.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2, 2)), probe2.Ref);
probe2.ExpectMsg(new UpdateSuccess(keyA, null));
replicator2.Tell(Dsl.Update(_keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2, 2)), probe2.Ref);
probe2.ExpectMsg(new UpdateSuccess(_keyA, null));

EnterBarrier("updates-done");

Within(TimeSpan.FromSeconds(10), () => AwaitAssert(() =>
{
replicator.Tell(Dsl.Get(keyA, new ReadAll(TimeSpan.FromSeconds(1))));
var counter1 = ExpectMsg<GetSuccess>().Get(keyA);
_replicator.Tell(Dsl.Get(_keyA, new ReadAll(TimeSpan.FromSeconds(1))));
var counter1 = ExpectMsg<GetSuccess>().Get(_keyA);
counter1.Value.ShouldBe(10UL);
counter1.State.Count.ShouldBe(4);
}));

Within(TimeSpan.FromSeconds(10), () => AwaitAssert(() =>
{
replicator2.Tell(Dsl.Get(keyA, new ReadAll(TimeSpan.FromSeconds(1))), probe2.Ref);
var counter2 = probe2.ExpectMsg<GetSuccess>().Get(keyA);
replicator2.Tell(Dsl.Get(_keyA, new ReadAll(TimeSpan.FromSeconds(1))), probe2.Ref);
var counter2 = probe2.ExpectMsg<GetSuccess>().Get(_keyA);
counter2.Value.ShouldBe(10UL);
counter2.State.Count.ShouldBe(4);
}));
EnterBarrier("get1");

RunOn(() => cluster.Leave(cluster2.SelfAddress), first);
RunOn(() => _cluster.Leave(cluster2.SelfAddress), _first);

Within(TimeSpan.FromSeconds(15), () => AwaitAssert(() =>
{
replicator.Tell(Dsl.GetReplicaCount);
_replicator.Tell(Dsl.GetReplicaCount);
ExpectMsg(new ReplicaCount(3));
}));
EnterBarrier("removed");

RunOn(() => sys2.Terminate().Wait(TimeSpan.FromSeconds(5)), first);
RunOn(() => sys2.Terminate().Wait(TimeSpan.FromSeconds(5)), _first);

Within(TimeSpan.FromSeconds(15), () =>
{
var values = ImmutableHashSet<int>.Empty;
AwaitAssert(() =>
{
replicator.Tell(Dsl.Get(keyA, ReadLocal.Instance));
var counter3 = ExpectMsg<GetSuccess>().Get(keyA);
_replicator.Tell(Dsl.Get(_keyA, ReadLocal.Instance));
var counter3 = ExpectMsg<GetSuccess>().Get(_keyA);
var value = counter3.Value;
values = values.Add((int) value);
value.ShouldBe(10UL);
Expand All @@ -141,15 +142,15 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
var cluster3 = Akka.Cluster.Cluster.Get(sys3);
var replicator3 = StartReplicator(sys3);
var probe3 = new TestProbe(sys3, new XunitAssertions());
cluster3.Join(Node(first).Address);
cluster3.Join(Node(_first).Address);

Within(TimeSpan.FromSeconds(10), () =>
{
var values = ImmutableHashSet<int>.Empty;
AwaitAssert(() =>
{
replicator3.Tell(Dsl.Get(keyA, ReadLocal.Instance), probe3.Ref);
var counter4 = probe3.ExpectMsg<GetSuccess>().Get(keyA);
replicator3.Tell(Dsl.Get(_keyA, ReadLocal.Instance), probe3.Ref);
var counter4 = probe3.ExpectMsg<GetSuccess>().Get(_keyA);
var value = counter4.Value;
values.Add((int) value);
value.ShouldBe(10UL);
Expand All @@ -159,16 +160,16 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
});

// after merging with others
replicator3.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault)));
var counter5 = ExpectMsg<GetSuccess>().Get(keyA);
replicator3.Tell(Dsl.Get(_keyA, new ReadAll(RemainingOrDefault)));
var counter5 = ExpectMsg<GetSuccess>().Get(_keyA);
counter5.Value.ShouldBe(10UL);
counter5.State.Count.ShouldBe(3);

}, first);
}, _first);
EnterBarrier("sys3-started");

replicator.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault)));
var counter6 = ExpectMsg<GetSuccess>().Get(keyA);
_replicator.Tell(Dsl.Get(_keyA, new ReadAll(RemainingOrDefault)));
var counter6 = ExpectMsg<GetSuccess>().Get(_keyA);
counter6.Value.ShouldBe(10UL);
counter6.State.Count.ShouldBe(3);

Expand All @@ -180,13 +181,13 @@ private IActorRef StartReplicator(ActorSystem system)
return system.ActorOf(Replicator.Props(
ReplicatorSettings.Create(system)
.WithGossipInterval(TimeSpan.FromSeconds(1))
.WithPruning(TimeSpan.FromSeconds(1), maxPruningDissemination)),
.WithPruning(TimeSpan.FromSeconds(1), _maxPruningDissemination)),
"replicator");
}

private void Join(RoleName from, RoleName to)
{
RunOn(() => cluster.Join(Node(to).Address), from);
RunOn(() => _cluster.Join(Node(to).Address), from);
EnterBarrier(from.Name + "-joined");
}
}
Expand Down

0 comments on commit 82c1633

Please sign in to comment.