Skip to content

Commit

Permalink
troubleshooting ShardingRegion.StartEntity handling changes (#7051)
Browse files Browse the repository at this point in the history
* troubleshooting `ShardingRegion.StartEntity` handling changes

* adding MNTR test changes

* fix issues with `ShardRegion.StartEntity`

* fixed

* made internal `ShardId` usage consistent

* cleaned up typedef usages
  • Loading branch information
Aaronontheweb authored Jan 9, 2024
1 parent b6a4a5c commit f03fc68
Show file tree
Hide file tree
Showing 26 changed files with 485 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ private void Cluster_sharding_with_down_member_scenario_2_must_join_cluster()
{
Within(TimeSpan.FromSeconds(20), () =>
{
StartPersistenceIfNeeded(startOn: config.First, config.First, config.Second);
StartPersistenceIfNeeded(startOn: Config.First, Config.First, Config.Second);
Join(config.First, config.First, onJoinedRunOnFrom: StartSharding);
Join(config.Second, config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);
Join(Config.First, Config.First, onJoinedRunOnFrom: StartSharding);
Join(Config.Second, Config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);
// all Up, everywhere before continuing
RunOn(() =>
Expand All @@ -206,7 +206,7 @@ private void Cluster_sharding_with_down_member_scenario_2_must_join_cluster()
Cluster.State.Members.Count.Should().Be(2);
Cluster.State.Members.Should().OnlyContain(m => m.Status == MemberStatus.Up);
});
}, config.First, config.Second);
}, Config.First, Config.Second);
EnterBarrier("after-2");
});
Expand All @@ -225,26 +225,26 @@ private void Cluster_sharding_with_down_member_scenario_2_must_initialize_shards
}).ToImmutableDictionary();
shardLocations.Tell(new Locations(locations));
Sys.Log.Debug("Original locations: [{0}]", string.Join(", ", locations.Select(i => $"{i.Key}: {i.Value}")));
}, config.First);
}, Config.First);
EnterBarrier("after-3");
}

private void Cluster_sharding_with_down_member_scenario_2_must_recover_after_downing_other_node_not_coordinator()
{
Within(TimeSpan.FromSeconds(20), () =>
{
var secondAddress = GetAddress(config.Second);
var secondAddress = GetAddress(Config.Second);
RunOn(() =>
{
TestConductor.Blackhole(config.First, config.Second, Direction.Both).Wait();
}, config.First);
TestConductor.Blackhole(Config.First, Config.Second, Direction.Both).Wait();
}, Config.First);
Thread.Sleep(3000);
RunOn(() =>
{
Cluster.Down(GetAddress(config.Second));
Cluster.Down(GetAddress(Config.Second));
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(1);
Expand All @@ -264,7 +264,7 @@ private void Cluster_sharding_with_down_member_scenario_2_must_recover_after_dow
});
Sys.Log.Debug("Additional locations: [{0}]", string.Join(", ", additionalLocations.Select(i => $"{i.Key}: {i.Value}")));
Sys.ActorSelection(Node(config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
Sys.ActorSelection(Node(Config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
var originalLocations = ExpectMsg<Locations>().Locs;
AwaitAssert(() =>
Expand All @@ -284,7 +284,7 @@ private void Cluster_sharding_with_down_member_scenario_2_must_recover_after_dow
}
});
}, config.First);
}, Config.First);
});

EnterBarrier("after-4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ private void Cluster_sharding_with_down_member_scenario_1_must_join_cluster()
{
Within(TimeSpan.FromSeconds(20), () =>
{
StartPersistenceIfNeeded(startOn: config.Controller, config.First, config.Second);
StartPersistenceIfNeeded(startOn: Config.Controller, Config.First, Config.Second);
Join(config.First, config.First, onJoinedRunOnFrom: StartSharding);
Join(config.Second, config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);
Join(Config.First, Config.First, onJoinedRunOnFrom: StartSharding);
Join(Config.Second, Config.First, onJoinedRunOnFrom: StartSharding, assertNodeUp: false);
// all Up, everywhere before continuing
RunOn(() =>
Expand All @@ -208,7 +208,7 @@ private void Cluster_sharding_with_down_member_scenario_1_must_join_cluster()
Cluster.State.Members.Count.Should().Be(2);
Cluster.State.Members.Should().OnlyContain(m => m.Status == MemberStatus.Up);
});
}, config.First, config.Second);
}, Config.First, Config.Second);
EnterBarrier("after-2");
});
Expand All @@ -228,30 +228,30 @@ private void Cluster_sharding_with_down_member_scenario_1_must_initialize_shards
}).ToImmutableDictionary();
shardLocations.Tell(new Locations(locations));
Sys.Log.Debug("Original locations: [{0}]", string.Join(", ", locations.Select(i => $"{i.Key}: {i.Value}")));
}, config.First);
}, Config.First);
EnterBarrier("after-3");
}

private void Cluster_sharding_with_down_member_scenario_1_must_recover_after_downing_coordinator_node()
{
Within(TimeSpan.FromSeconds(20), () =>
{
var firstAddress = GetAddress(config.First);
Sys.ActorSelection(Node(config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
var firstAddress = GetAddress(Config.First);
Sys.ActorSelection(Node(Config.First) / "user" / "shardLocations").Tell(GetLocations.Instance);
var originalLocations = ExpectMsg<Locations>().Locs;
EnterBarrier("after-3-locations");
RunOn(() =>
{
TestConductor.Blackhole(config.First, config.Second, Direction.Both).Wait();
}, config.Controller);
TestConductor.Blackhole(Config.First, Config.Second, Direction.Both).Wait();
}, Config.Controller);
Thread.Sleep(3000);
RunOn(() =>
{
Cluster.Down(GetAddress(config.First));
Cluster.Down(GetAddress(Config.First));
AwaitAssert(() =>
{
Cluster.State.Members.Count.Should().Be(1);
Expand Down Expand Up @@ -288,7 +288,7 @@ private void Cluster_sharding_with_down_member_scenario_1_must_recover_after_dow
}
});
}, config.Second);
}, Config.Second);
});

EnterBarrier("after-4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ private void Cluster_sharding_with_custom_allocation_strategy_must_use_specified
{
Within(TimeSpan.FromSeconds(30), () =>
{
StartPersistenceIfNeeded(startOn: config.First, config.First, config.Second);
StartPersistenceIfNeeded(startOn: Config.First, Config.First, Config.Second);
Join(config.First, config.First);
Join(Config.First, Config.First);
RunOn(() =>
{
Expand All @@ -221,43 +221,43 @@ private void Cluster_sharding_with_custom_allocation_strategy_must_use_specified
_region.Value.Tell(1);
ExpectMsg(1);
LastSender.Path.Should().Be(_region.Value.Path / "1" / "1");
}, config.First);
}, Config.First);
EnterBarrier("first-started");
Join(config.Second, config.First);
Join(Config.Second, Config.First);
_region.Value.Tell(2);
ExpectMsg(2);
RunOn(() =>
{
LastSender.Path.Should().Be(_region.Value.Path / "2" / "2");
}, config.First);
}, Config.First);
RunOn(() =>
{
LastSender.Path.Should().Be(Node(config.First) / "system" / "sharding" / "Entity" / "2" / "2");
}, config.Second);
LastSender.Path.Should().Be(Node(Config.First) / "system" / "sharding" / "Entity" / "2" / "2");
}, Config.Second);
EnterBarrier("second-started");
RunOn(() =>
{
Sys.ActorSelection(Node(config.Second) / "system" / "sharding" / "Entity").Tell(new Identify(null));
Sys.ActorSelection(Node(Config.Second) / "system" / "sharding" / "Entity").Tell(new Identify(null));
var secondRegion = ExpectMsg<ActorIdentity>().Subject;
_allocator.Value.Tell(new UseRegion(secondRegion));
ExpectMsg<UseRegionAck>();
}, config.First);
}, Config.First);
EnterBarrier("second-active");
_region.Value.Tell(3);
ExpectMsg(3);
RunOn(() =>
{
LastSender.Path.Should().Be(_region.Value.Path / "3" / "3");
}, config.Second);
}, Config.Second);
RunOn(() =>
{
LastSender.Path.Should().Be(Node(config.Second) / "system" / "sharding" / "Entity" / "3" / "3");
}, config.First);
LastSender.Path.Should().Be(Node(Config.Second) / "system" / "sharding" / "Entity" / "3" / "3");
}, Config.First);
EnterBarrier("after-2");
});
Expand All @@ -278,13 +278,13 @@ private void Cluster_sharding_with_custom_allocation_strategy_must_rebalance_spe
_region.Value.Tell(2, p.Ref);
p.ExpectMsg(2, TimeSpan.FromSeconds(2));
p.LastSender.Path.Should().Be(Node(config.Second) / "system" / "sharding" / "Entity" / "2" / "2");
p.LastSender.Path.Should().Be(Node(Config.Second) / "system" / "sharding" / "Entity" / "2" / "2");
});
_region.Value.Tell(1);
ExpectMsg(1);
LastSender.Path.Should().Be(_region.Value.Path / "1" / "1");
}, config.First);
}, Config.First);
EnterBarrier("after-2");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ private void ClusterSharding_with_flaky_journal_network_must_join_cluster()
{
Within(TimeSpan.FromSeconds(20), () =>
{
StartPersistenceIfNeeded(startOn: config.Controller, config.First, config.Second);
StartPersistenceIfNeeded(startOn: Config.Controller, Config.First, Config.Second);
Join(config.First, config.First);
Join(config.Second, config.First);
Join(Config.First, Config.First);
Join(Config.Second, Config.First);
RunOn(() =>
{
Expand All @@ -214,7 +214,7 @@ private void ClusterSharding_with_flaky_journal_network_must_join_cluster()
ExpectMsg<Value>(v => v.Id == "20" && v.N == 2);
region.Tell(new Get("21"));
ExpectMsg<Value>(v => v.Id == "21" && v.N == 3);
}, config.First);
}, Config.First);
EnterBarrier("after-2");
});
}
Expand All @@ -227,14 +227,14 @@ private void ClusterSharding_with_flaky_journal_network_must_recover_after_journ
{
if (PersistenceIsNeeded)
{
TestConductor.Blackhole(config.Controller, config.First, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.Blackhole(config.Controller, config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.Blackhole(Config.Controller, Config.First, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.Blackhole(Config.Controller, Config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
}
else
{
TestConductor.Blackhole(config.First, config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.Blackhole(Config.First, Config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
}
}, config.Controller);
}, Config.Controller);
EnterBarrier("journal-backholded");
RunOn(() =>
Expand All @@ -245,21 +245,21 @@ private void ClusterSharding_with_flaky_journal_network_must_recover_after_journ
var probe = CreateTestProbe();
region.Tell(new Get("40"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromSeconds(1));
}, config.First);
}, Config.First);
EnterBarrier("first-delayed");
RunOn(() =>
{
if (PersistenceIsNeeded)
{
TestConductor.PassThrough(config.Controller, config.First, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.PassThrough(config.Controller, config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.PassThrough(Config.Controller, Config.First, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.PassThrough(Config.Controller, Config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
}
else
{
TestConductor.PassThrough(config.First, config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
TestConductor.PassThrough(Config.First, Config.Second, ThrottleTransportAdapter.Direction.Both).Wait();
}
}, config.Controller);
}, Config.Controller);
EnterBarrier("journal-ok");
RunOn(() =>
Expand Down Expand Up @@ -305,7 +305,7 @@ private void ClusterSharding_with_flaky_journal_network_must_recover_after_journ
region.Tell(new Get("40"));
ExpectMsg<Value>(v => v.Id == "40" && v.N == 4);
}, config.First);
}, Config.First);
EnterBarrier("verified-first");
RunOn(() =>
Expand All @@ -323,7 +323,7 @@ private void ClusterSharding_with_flaky_journal_network_must_recover_after_journ
ExpectMsg<Value>(v => v.Id == "20" && v.N == 4);
region.Tell(new Get("30"));
ExpectMsg<Value>(v => v.Id == "30" && v.N == 6);
}, config.Second);
}, Config.Second);
EnterBarrier("after-3");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public void Inspecting_cluster_sharding_state_specs()

private void Inspecting_cluster_sharding_state_must_join_cluster()
{
Join(config.Controller, config.Controller);
Join(config.First, config.Controller);
Join(config.Second, config.Controller);
Join(Config.Controller, Config.Controller);
Join(Config.First, Config.Controller);
Join(Config.Second, Config.Controller);

// make sure all nodes are up
AwaitAssert(() =>
Expand All @@ -110,18 +110,18 @@ private void Inspecting_cluster_sharding_state_must_join_cluster()
role: "shard",
extractEntityId: extractEntityId,
extractShardId: extractShardId);
}, config.Controller);
}, Config.Controller);

RunOn(() =>
{
StartSharding(
Sys,
typeName: ShardTypeName,
entityProps: Props.Create(() => new PingPongActor()),
settings: settings.Value.WithRole("shard"),
settings: Settings.Value.WithRole("shard"),
extractEntityId: extractEntityId,
extractShardId: extractShardId);
}, config.First, config.Second);
}, Config.First, Config.Second);

EnterBarrier("sharding started");
}
Expand Down Expand Up @@ -158,7 +158,7 @@ private void Inspecting_cluster_sharding_state_must_trigger_sharded_actors()
pingProbe.ReceiveWhile(null, m => (PingPongActor.Pong)m, 4);
});
});
}, config.Controller);
}, Config.Controller);

EnterBarrier("sharded actors started");
}
Expand Down
Loading

0 comments on commit f03fc68

Please sign in to comment.