diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj index e3331ead973..09af113497e 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj @@ -6,13 +6,13 @@ Akka.Cluster.Sharding.Tests $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) - + + - diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingLeaseSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingLeaseSpec.cs index 17abc2a9ad3..d154bc5d6da 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingLeaseSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingLeaseSpec.cs @@ -13,8 +13,8 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Cluster.Tools.Singleton; -using Akka.Cluster.Tools.Tests; using Akka.Configuration; +using Akka.Coordination.Tests; using Akka.TestKit; using Akka.TestKit.TestActors; using Akka.Util; diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardSpec.cs index c2b04b59c78..fbaeae53ede 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardSpec.cs @@ -11,9 +11,9 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Cluster.Tools.Singleton; -using Akka.Cluster.Tools.Tests; using Akka.Configuration; using Akka.Coordination; +using Akka.Coordination.Tests; using Akka.Event; using Akka.TestKit; using Akka.Util; @@ -51,7 +51,7 @@ static ShardSpec() akka.actor.provider = ""cluster"" akka.remote.dot-netty.tcp.port = 0 test-lease { - lease-class = ""Akka.Cluster.Tools.Tests.TestLease, Akka.Cluster.Tools.Tests"" + lease-class = ""Akka.Coordination.Tests.TestLease, Akka.Coordination.Tests"" heartbeat-interval = 1s heartbeat-timeout = 120s lease-operation-timeout = 3s diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Akka.Cluster.Tools.Tests.MultiNode.csproj b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Akka.Cluster.Tools.Tests.MultiNode.csproj index 466bb53fc8a..b996e0f0ec5 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Akka.Cluster.Tools.Tests.MultiNode.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Akka.Cluster.Tools.Tests.MultiNode.csproj @@ -1,12 +1,13 @@  - + Akka.Cluster.Tools.Tests.MultiNode $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) + diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaseSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaseSpec.cs index f021d787060..84a5c43182d 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaseSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerLeaseSpec.cs @@ -12,6 +12,7 @@ using Akka.Cluster.TestKit; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; +using Akka.Coordination.Tests; using Akka.Event; using Akka.Remote.TestKit; using Akka.TestKit; @@ -45,7 +46,7 @@ public ClusterSingletonManagerLeaseSpecConfig() akka.cluster.auto-down-unreachable-after = 0s akka.cluster.testkit.auto-down-unreachable-after = 0s test-lease { - lease-class = ""Akka.Cluster.Tools.Tests.MultiNode.TestLeaseActorClient, Akka.Cluster.Tools.Tests.MultiNode"" + lease-class = ""Akka.Coordination.Tests.TestLeaseActorClient, Akka.Coordination.Tests"" heartbeat-interval = 1s heartbeat-timeout = 120s lease-operation-timeout = 3s diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj index 5ed0131226e..c45633ab96f 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeaseSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeaseSpec.cs index cd9ca2522a2..fda2ffca928 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeaseSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeaseSpec.cs @@ -15,6 +15,7 @@ using Akka.Actor; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; +using Akka.Coordination.Tests; using Akka.Event; using Akka.TestKit; using Akka.TestKit.TestActors; diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt index 8f9b3b5ccab..d0346d6e072 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt @@ -400,11 +400,15 @@ namespace Akka.Cluster.SBR } public sealed class LeaseMajoritySettings { + [System.ObsoleteAttribute()] public LeaseMajoritySettings(string leaseImplementation, System.TimeSpan acquireLeaseDelayForMinority, string role) { } + [System.ObsoleteAttribute()] public LeaseMajoritySettings(string leaseImplementation, System.TimeSpan acquireLeaseDelayForMinority, string role, string leaseName) { } + public LeaseMajoritySettings(string leaseImplementation, System.TimeSpan acquireLeaseDelayForMinority, System.TimeSpan releaseAfter, string role, string leaseName) { } public System.TimeSpan AcquireLeaseDelayForMinority { get; } public string LeaseImplementation { get; } public string LeaseName { get; } + public System.TimeSpan ReleaseAfter { get; } public string Role { get; } public string SafeLeaseName(string systemName) { } } diff --git a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj index 0635ef67d3f..05e13029eda 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj +++ b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj @@ -9,16 +9,17 @@ + - + - + diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllIndirectlyConnected5NodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllIndirectlyConnected5NodeSpec.cs new file mode 100644 index 00000000000..9847d8b0de0 --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllIndirectlyConnected5NodeSpec.cs @@ -0,0 +1,174 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Tests.MultiNode.SBR +{ + public class DownAllIndirectlyConnected5NodeSpecConfig : MultiNodeConfig + { + public RoleName Node1 { get; } + public RoleName Node2 { get; } + public RoleName Node3 { get; } + public RoleName Node4 { get; } + public RoleName Node5 { get; } + + + public DownAllIndirectlyConnected5NodeSpecConfig() + { + Node1 = Role("node1"); + Node2 = Role("node2"); + Node3 = Role("node3"); + Node4 = Role("node4"); + Node5 = Role("node5"); + + CommonConfig = ConfigurationFactory.ParseString(@" + + akka { + loglevel = INFO + cluster { + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider"" + split-brain-resolver.active-strategy = keep-majority + split-brain-resolver.stable-after = 6s + + run-coordinated-shutdown-when-down = off + } + + actor.provider = cluster + + test.filter-leeway = 10s + }") + .WithFallback(MultiNodeLoggingConfig.LoggingConfig) + .WithFallback(DebugConfig(true)) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + TestTransport = true; + } + } + + public class DownAllIndirectlyConnected5NodeSpec : MultiNodeClusterSpec + { + private readonly DownAllIndirectlyConnected5NodeSpecConfig _config; + + public DownAllIndirectlyConnected5NodeSpec() + : this(new DownAllIndirectlyConnected5NodeSpecConfig()) + { + } + + protected DownAllIndirectlyConnected5NodeSpec(DownAllIndirectlyConnected5NodeSpecConfig config) + : base(config, typeof(DownAllIndirectlyConnected5NodeSpec)) + { + _config = config; + } + + [MultiNodeFact] + public void DownAllIndirectlyConnected5NodeSpecTests() + { + A_5_node_cluster_with_keep_one_indirectly_connected_off_should_down_all_when_indirectly_connected_combined_with_clean_partition(); + } + + public void A_5_node_cluster_with_keep_one_indirectly_connected_off_should_down_all_when_indirectly_connected_combined_with_clean_partition() + { + var cluster = Cluster.Get(Sys); + + RunOn(() => + { + cluster.Join(cluster.SelfAddress); + }, _config.Node1); + EnterBarrier("node1 joined"); + RunOn(() => + { + cluster.Join(Node(_config.Node1).Address); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + cluster.State.Members.Count.Should().Be(5); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + EnterBarrier("Cluster formed"); + + RunOn(() => + { + + foreach (var x in new[] { _config.Node1, _config.Node2, _config.Node3 }) + { + foreach (var y in new[] { _config.Node4, _config.Node5 }) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); + } + } + }, _config.Node1); + EnterBarrier("blackholed-clean-partition"); + + RunOn(() => + { + TestConductor.Blackhole(_config.Node2, _config.Node3, ThrottleTransportAdapter.Direction.Both).Wait(); + }, _config.Node1); + EnterBarrier("blackholed-indirectly-connected"); + + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node2, _config.Node3, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node1); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node3, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node2); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node2, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node3); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node1, _config.Node2, _config.Node3 }.Select(i => Node(i).Address)); + }, _config.Node4, _config.Node5); + }); + }); + EnterBarrier("unreachable"); + + RunOn(() => + { + Within(TimeSpan.FromSeconds(15), () => + { + AwaitAssert(() => + { + cluster.State.Members.Select(i => i.Address).Should().BeEquivalentTo(Node(_config.Node1).Address); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + }, _config.Node1); + + RunOn(() => + { + // downed + AwaitCondition(() => cluster.IsTerminated, max: TimeSpan.FromSeconds(15)); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + + EnterBarrier("done"); + } + } +} diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllUnstable5NodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllUnstable5NodeSpec.cs new file mode 100644 index 00000000000..e4babe4eefa --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/SBR/DownAllUnstable5NodeSpec.cs @@ -0,0 +1,177 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Tests.MultiNode.SBR +{ + public class DownAllUnstable5NodeSpecConfig : MultiNodeConfig + { + public RoleName Node1 { get; } + public RoleName Node2 { get; } + public RoleName Node3 { get; } + public RoleName Node4 { get; } + public RoleName Node5 { get; } + + + public DownAllUnstable5NodeSpecConfig() + { + Node1 = Role("node1"); + Node2 = Role("node2"); + Node3 = Role("node3"); + Node4 = Role("node4"); + Node5 = Role("node5"); + + CommonConfig = ConfigurationFactory.ParseString(@" + akka { + loglevel = INFO + cluster { + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider"" + failure-detector.acceptable-heartbeat-pause = 3s + split-brain-resolver.active-strategy = keep-majority + split-brain-resolver.stable-after = 10s + split-brain-resolver.down-all-when-unstable = 7s + + run-coordinated-shutdown-when-down = off + } + + # quicker reconnect + remote.retry-gate-closed-for = 1s + remote.netty.tcp.connection-timeout = 3 s + + actor.provider = cluster + + test.filter-leeway = 10s + }") + .WithFallback(MultiNodeLoggingConfig.LoggingConfig) + .WithFallback(DebugConfig(true)) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + TestTransport = true; + } + } + + public class DownAllUnstable5NodeSpec : MultiNodeClusterSpec + { + private readonly DownAllUnstable5NodeSpecConfig _config; + + public DownAllUnstable5NodeSpec() + : this(new DownAllUnstable5NodeSpecConfig()) + { + } + + protected DownAllUnstable5NodeSpec(DownAllUnstable5NodeSpecConfig config) + : base(config, typeof(DownAllUnstable5NodeSpec)) + { + _config = config; + } + + [MultiNodeFact] + public void DownAllUnstable5NodeSpecTests() + { + A_5_node_cluster_with_down_all_when_unstable_should_down_all_when_instability_continues(); + } + + public void A_5_node_cluster_with_down_all_when_unstable_should_down_all_when_instability_continues() + { + var cluster = Cluster.Get(Sys); + + RunOn(() => + { + cluster.Join(cluster.SelfAddress); + }, _config.Node1); + EnterBarrier("node1 joined"); + RunOn(() => + { + cluster.Join(Node(_config.Node1).Address); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + cluster.State.Members.Count.Should().Be(5); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + + EnterBarrier("Cluster formed"); + + // acceptable-heartbeat-pause = 3s + // stable-after = 10s + // down-all-when-unstable = 7s + + RunOn(() => + { + foreach (var x in new[] { _config.Node1, _config.Node2, _config.Node3 }) + { + foreach (var y in new[] { _config.Node4, _config.Node5 }) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); + } + } + }, _config.Node1); + EnterBarrier("blackholed-clean-partition"); + + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node1, _config.Node2, _config.Node3); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node1, _config.Node2, _config.Node3 }.Select(i => Node(i).Address)); + }, _config.Node4, _config.Node5); + }); + }); + EnterBarrier("unreachable-clean-partition"); + + // no decision yet + Thread.Sleep(2000); + cluster.State.Members.Count.Should().Be(5); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + + RunOn(() => + { + TestConductor.Blackhole(_config.Node2, _config.Node3, ThrottleTransportAdapter.Direction.Both).Wait(); + }, _config.Node1); + EnterBarrier("blackhole-2"); + // then it takes about 5 seconds for failure detector to observe that + Thread.Sleep(7000); + + RunOn(() => + { + TestConductor.PassThrough(_config.Node2, _config.Node3, ThrottleTransportAdapter.Direction.Both).Wait(); + }, _config.Node1); + EnterBarrier("passThrough-2"); + + // now it should have been unstable for more than 17 seconds + + // all downed + AwaitCondition(() => cluster.IsTerminated, max: TimeSpan.FromSeconds(15)); + + EnterBarrier("done"); + } + } +} + diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected3NodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected3NodeSpec.cs new file mode 100644 index 00000000000..05005fbec41 --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected3NodeSpec.cs @@ -0,0 +1,151 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Tests.MultiNode.SBR +{ + public class IndirectlyConnected3NodeSpecConfig : MultiNodeConfig + { + public RoleName Node1 { get; } + public RoleName Node2 { get; } + public RoleName Node3 { get; } + + public IndirectlyConnected3NodeSpecConfig() + { + Node1 = Role("node1"); + Node2 = Role("node2"); + Node3 = Role("node3"); + + CommonConfig = ConfigurationFactory.ParseString(@" + akka { + loglevel = INFO + cluster { + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider"" + split-brain-resolver.active-strategy = keep-majority + split-brain-resolver.stable-after = 6s + + run-coordinated-shutdown-when-down = off + } + + actor.provider = cluster + + test.filter-leeway = 10s + }") + .WithFallback(MultiNodeLoggingConfig.LoggingConfig) + .WithFallback(DebugConfig(true)) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + TestTransport = true; + } + } + + public class IndirectlyConnected3NodeSpec : MultiNodeClusterSpec + { + private readonly IndirectlyConnected3NodeSpecConfig _config; + + public IndirectlyConnected3NodeSpec() + : this(new IndirectlyConnected3NodeSpecConfig()) + { + } + + protected IndirectlyConnected3NodeSpec(IndirectlyConnected3NodeSpecConfig config) + : base(config, typeof(IndirectlyConnected3NodeSpec)) + { + _config = config; + } + + [MultiNodeFact] + public void IndirectlyConnected3NodeSpecTests() + { + A_3_node_cluster_should_avoid_a_split_brain_when_two_unreachable_but_can_talk_via_third(); + } + + public void A_3_node_cluster_should_avoid_a_split_brain_when_two_unreachable_but_can_talk_via_third() + { + var cluster = Cluster.Get(Sys); + + RunOn(() => + { + cluster.Join(cluster.SelfAddress); + }, _config.Node1); + EnterBarrier("node1 joined"); + RunOn(() => + { + cluster.Join(Node(_config.Node1).Address); + }, _config.Node2, _config.Node3); + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + cluster.State.Members.Count.Should().Be(3); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + EnterBarrier("Cluster formed"); + + RunOn(() => + { + TestConductor.Blackhole(_config.Node2, _config.Node3, ThrottleTransportAdapter.Direction.Both).Wait(); + }, _config.Node1); + EnterBarrier("Blackholed"); + + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(Node(_config.Node2).Address); + }, _config.Node3); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(Node(_config.Node3).Address); + }, _config.Node2); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node3, _config.Node2 }.Select(i => Node(i).Address)); + }, _config.Node1); + }); + }); + EnterBarrier("unreachable"); + + RunOn(() => + { + Within(TimeSpan.FromSeconds(15), () => + { + AwaitAssert(() => + { + cluster.State.Members.Select(i => i.Address).Should().BeEquivalentTo(Node(_config.Node1).Address); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + }, _config.Node1); + + RunOn(() => + { + // downed + AwaitCondition(() => cluster.IsTerminated, max: TimeSpan.FromSeconds(15)); + }, _config.Node2, _config.Node3); + + EnterBarrier("done"); + } + } +} diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected5NodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected5NodeSpec.cs new file mode 100644 index 00000000000..03e1de074a1 --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/SBR/IndirectlyConnected5NodeSpec.cs @@ -0,0 +1,174 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Tests.MultiNode.SBR +{ + public class IndirectlyConnected5NodeSpecConfig : MultiNodeConfig + { + public RoleName Node1 { get; } + public RoleName Node2 { get; } + public RoleName Node3 { get; } + public RoleName Node4 { get; } + public RoleName Node5 { get; } + + + public IndirectlyConnected5NodeSpecConfig() + { + Node1 = Role("node1"); + Node2 = Role("node2"); + Node3 = Role("node3"); + Node4 = Role("node4"); + Node5 = Role("node5"); + + CommonConfig = ConfigurationFactory.ParseString(@" + akka { + loglevel = INFO + cluster { + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider"" + split-brain-resolver.active-strategy = keep-majority + split-brain-resolver.stable-after = 6s + + run-coordinated-shutdown-when-down = off + } + + actor.provider = cluster + + test.filter-leeway = 10s + }") + .WithFallback(MultiNodeLoggingConfig.LoggingConfig) + .WithFallback(DebugConfig(true)) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + TestTransport = true; + } + } + + public class IndirectlyConnected5NodeSpec : MultiNodeClusterSpec + { + private readonly IndirectlyConnected5NodeSpecConfig _config; + + public IndirectlyConnected5NodeSpec() + : this(new IndirectlyConnected5NodeSpecConfig()) + { + } + + protected IndirectlyConnected5NodeSpec(IndirectlyConnected5NodeSpecConfig config) + : base(config, typeof(IndirectlyConnected5NodeSpec)) + { + _config = config; + } + + [MultiNodeFact] + public void IndirectlyConnected5NodeSpecTests() + { + A_5_node_cluster_should_avoid_a_split_brain_when_indirectly_connected_combined_with_clean_partition(); + } + + public void A_5_node_cluster_should_avoid_a_split_brain_when_indirectly_connected_combined_with_clean_partition() + { + var cluster = Cluster.Get(Sys); + + RunOn(() => + { + cluster.Join(cluster.SelfAddress); + }, _config.Node1); + EnterBarrier("node1 joined"); + RunOn(() => + { + cluster.Join(Node(_config.Node1).Address); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + cluster.State.Members.Count.Should().Be(5); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + EnterBarrier("Cluster formed"); + + RunOn(() => + { + foreach (var x in new[] { _config.Node1, _config.Node2, _config.Node3 }) + { + foreach (var y in new[] { _config.Node4, _config.Node5 }) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); + } + } + + }, _config.Node1); + EnterBarrier("blackholed-clean-partition"); + + RunOn(() => + { + TestConductor.Blackhole(_config.Node2, _config.Node3, ThrottleTransportAdapter.Direction.Both).Wait(); + }, _config.Node1); + EnterBarrier("blackholed-indirectly-connected"); + + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node2, _config.Node3, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node1); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node3, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node2); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node2, _config.Node4, _config.Node5 }.Select(i => Node(i).Address)); + }, _config.Node3); + RunOn(() => + { + cluster.State.Unreachable.Select(i => i.Address).Should().BeEquivalentTo(new[] { _config.Node1, _config.Node2, _config.Node3 }.Select(i => Node(i).Address)); + }, _config.Node4, _config.Node5); + }); + }); + EnterBarrier("unreachable"); + + RunOn(() => + { + Within(TimeSpan.FromSeconds(15), () => + { + AwaitAssert(() => + { + cluster.State.Members.Select(i => i.Address).Should().BeEquivalentTo(Node(_config.Node1).Address); + foreach (var m in cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + }, _config.Node1); + + RunOn(() => + { + // downed + AwaitCondition(() => cluster.IsTerminated, max: TimeSpan.FromSeconds(15)); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + + EnterBarrier("done"); + } + } +} + diff --git a/src/core/Akka.Cluster.Tests.MultiNode/SBR/LeaseMajority5NodeSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/SBR/LeaseMajority5NodeSpec.cs new file mode 100644 index 00000000000..11a4113d960 --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/SBR/LeaseMajority5NodeSpec.cs @@ -0,0 +1,269 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Coordination.Tests; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using FluentAssertions; + +namespace Akka.Cluster.Tests.MultiNode.SBR +{ + public class LeaseMajority5NodeSpecConfig : MultiNodeConfig + { + public RoleName Node1 { get; } + public RoleName Node2 { get; } + public RoleName Node3 { get; } + public RoleName Node4 { get; } + public RoleName Node5 { get; } + + + public LeaseMajority5NodeSpecConfig() + { + Node1 = Role("node1"); + Node2 = Role("node2"); + Node3 = Role("node3"); + Node4 = Role("node4"); + Node5 = Role("node5"); + + CommonConfig = ConfigurationFactory.ParseString(@" + + akka { + loglevel = INFO + cluster { + gossip-interval = 200 ms + leader-actions-interval = 200 ms + periodic-tasks-initial-delay = 300 ms + failure-detector.heartbeat-interval = 500 ms + + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider"" + split-brain-resolver { + active-strategy = lease-majority + stable-after = 1.5s + lease-majority { + lease-implementation = test-lease + acquire-lease-delay-for-minority = 1s + release-after = 3s + } + } + + run-coordinated-shutdown-when-down = off + } + + actor.provider = cluster + + test.filter-leeway = 10s + } + + test-lease { + lease-class = ""Akka.Coordination.Tests.TestLease, Akka.Coordination.Tests"" + heartbeat-interval = 1s + heartbeat-timeout = 120s + lease-operation-timeout = 3s + }") + .WithFallback(MultiNodeLoggingConfig.LoggingConfig) + .WithFallback(DebugConfig(true)) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + TestTransport = true; + } + } + + public class LeaseMajority5NodeSpec : MultiNodeClusterSpec + { + private readonly LeaseMajority5NodeSpecConfig _config; + private const string testLeaseName = "LeaseMajority5NodeSpec-akka-sbr"; + + public LeaseMajority5NodeSpec() + : this(new LeaseMajority5NodeSpecConfig()) + { + } + + protected LeaseMajority5NodeSpec(LeaseMajority5NodeSpecConfig config) + : base(config, typeof(LeaseMajority5NodeSpec)) + { + _config = config; + } + + /// + /// Sort the roles in the address order used by the cluster node ring. + /// + private class ClusterOrdering : IComparer + { + private readonly Func _node; + + public ClusterOrdering(Func node) + { + _node = node; + } + + public int Compare(RoleName x, RoleName y) + { + return Member.AddressOrdering.Compare(_node(x).Address, _node(y).Address); + } + } + + List SortByAddress(RoleName[] roles) + { + return roles.OrderBy(r => Node(r).Address, Member.AddressOrdering).ToList(); + } + + RoleName Leader(params RoleName[] roles) => SortByAddress(roles).First(); + + + [MultiNodeFact] + public void LeaseMajority5NodeSpecTests() + { + LeaseMajority_in_a_5_node_cluster_should_setup_cluster(); + LeaseMajority_in_a_5_node_cluster_should_keep_the_side_that_can_acquire_the_lease(); + LeaseMajority_in_a_5_node_cluster_should_keep_the_side_that_can_acquire_the_lease_round_2(); + } + + public void LeaseMajority_in_a_5_node_cluster_should_setup_cluster() + { + RunOn(() => + { + Cluster.Join(Cluster.SelfAddress); + }, _config.Node1); + EnterBarrier("node1 joined"); + RunOn(() => + { + Cluster.Join(Node(_config.Node1).Address); + }, _config.Node2, _config.Node3, _config.Node4, _config.Node5); + Within(TimeSpan.FromSeconds(10), () => + { + AwaitAssert(() => + { + Cluster.State.Members.Count.Should().Be(5); + foreach (var m in Cluster.State.Members) + { + m.Status.Should().Be(MemberStatus.Up); + } + }); + }); + EnterBarrier("Cluster formed"); + } + + public void LeaseMajority_in_a_5_node_cluster_should_keep_the_side_that_can_acquire_the_lease() + { + var lease = TestLeaseExt.Get(Sys).GetTestLease(testLeaseName); + var leaseProbe = lease.Probe; + + RunOn(() => + { + lease.SetNextAcquireResult(Task.FromResult(true)); + }, _config.Node1, _config.Node2, _config.Node3); + RunOn(() => + { + lease.SetNextAcquireResult(Task.FromResult(false)); + }, _config.Node4, _config.Node5); + EnterBarrier("lease-in-place"); + RunOn(() => + { + foreach (var x in new[] { _config.Node1, _config.Node2, _config.Node3 }) + { + foreach (var y in new[] { _config.Node4, _config.Node5 }) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); + } + } + }, _config.Node1); + EnterBarrier("blackholed-clean-partition"); + + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + Cluster.State.Members.Count.Should().Be(3); + }); + }); + RunOn(() => + { + leaseProbe.ExpectMsg(); + // after 2 * stable-after + leaseProbe.ExpectMsg(TimeSpan.FromSeconds(14)); + }, Leader(_config.Node1, _config.Node2, _config.Node3)); + }, _config.Node1, _config.Node2, _config.Node3); + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + Cluster.IsTerminated.Should().BeTrue(); + }); + RunOn(() => + { + leaseProbe.ExpectMsg(); + }, Leader(_config.Node4, _config.Node5)); + }); + }, _config.Node4, _config.Node5); + EnterBarrier("downed-and-removed"); + leaseProbe.ExpectNoMsg(TimeSpan.FromSeconds(1)); + + EnterBarrier("done-1"); + } + + public void LeaseMajority_in_a_5_node_cluster_should_keep_the_side_that_can_acquire_the_lease_round_2() + { + var lease = TestLeaseExt.Get(Sys).GetTestLease(testLeaseName); + + RunOn(() => + { + lease.SetNextAcquireResult(Task.FromResult(true)); + }, _config.Node1); + RunOn(() => + { + lease.SetNextAcquireResult(Task.FromResult(false)); + }, _config.Node2, _config.Node3); + EnterBarrier("lease-in-place-2"); + RunOn(() => + { + foreach (var x in new[] { _config.Node1 }) + { + foreach (var y in new[] { _config.Node2, _config.Node3 }) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); + } + } + }, _config.Node1); + EnterBarrier("blackholed-clean-partition-2"); + + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + Cluster.State.Members.Count.Should().Be(1); + }); + }); + }, _config.Node1); + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + Cluster.IsTerminated.Should().BeTrue(); + }); + }); + }, _config.Node2, _config.Node3); + + EnterBarrier("done-2"); + } + } +} diff --git a/src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj b/src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj index ed7152f6bb1..386a60ed705 100644 --- a/src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj +++ b/src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj @@ -8,8 +8,8 @@ - + diff --git a/src/core/Akka.Cluster.Tests/SBR/SplitBrainResolverSpec.cs b/src/core/Akka.Cluster.Tests/SBR/SplitBrainResolverSpec.cs index ca120bdcd37..db48d40be2b 100644 --- a/src/core/Akka.Cluster.Tests/SBR/SplitBrainResolverSpec.cs +++ b/src/core/Akka.Cluster.Tests/SBR/SplitBrainResolverSpec.cs @@ -20,11 +20,11 @@ using System.Collections.Immutable; using FluentAssertions; using Akka.Configuration; -using Akka.Cluster.Tools.Tests; using Akka.Remote; using System.Text.RegularExpressions; using System.Threading; using Akka.Util; +using Akka.Coordination.Tests; namespace Akka.Cluster.Tests.SBR { @@ -150,86 +150,28 @@ public SplitBrainResolverSpec(ITestOutputHelper output) : base(Config, output) { testLeaseSettings = new LeaseSettings("akka-sbr", "test", new TimeoutSettings(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(2), TimeSpan.FromSeconds(3)), ConfigurationFactory.Empty); - - #region TestAddresses - - var addressA = new Address("akka.tcp", "sys", "a", 2552); - MemberA = - new Member( - new UniqueAddress(addressA, 0), - 5, - MemberStatus.Up, - new[] { "role3" }, - AppVersion.Zero); - MemberB = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "b", addressA.Port), 0), - 4, - MemberStatus.Up, - new[] { "role1", "role3" }, - AppVersion.Zero); - MemberC = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "c", addressA.Port), 0), - 3, - MemberStatus.Up, - new[] { "role2" }, - AppVersion.Zero); - MemberD = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "d", addressA.Port), 0), - 2, - MemberStatus.Up, - new[] { "role1", "role2", "role3" }, - AppVersion.Zero); - MemberE = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "e", addressA.Port), 0), - 1, - MemberStatus.Up, - new string[] { }, - AppVersion.Zero); - MemberF = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "f", addressA.Port), 0), - 5, - MemberStatus.Up, - new string[] { }, - AppVersion.Zero); - MemberG = - new Member( - new UniqueAddress(new Address(addressA.Protocol, addressA.System, "g", addressA.Port), 0), - 6, - MemberStatus.Up, - new string[] { }, - AppVersion.Zero); - - MemberAWeaklyUp = new Member(MemberA.UniqueAddress, int.MaxValue, MemberStatus.WeaklyUp, MemberA.Roles, AppVersion.Zero); - MemberBWeaklyUp = new Member(MemberB.UniqueAddress, int.MaxValue, MemberStatus.WeaklyUp, MemberB.Roles, AppVersion.Zero); - - #endregion TestAddresses } #region TestAddresses - public Member MemberA { get; } - public Member MemberB { get; } - public Member MemberC { get; } - public Member MemberD { get; } - public Member MemberE { get; } - public Member MemberF { get; } - public Member MemberG { get; } + public Member MemberA => TestAddresses.MemberA; + public Member MemberB => TestAddresses.MemberB; + public Member MemberC => TestAddresses.MemberC; + public Member MemberD => TestAddresses.MemberD; + public Member MemberE => TestAddresses.MemberE; + public Member MemberF => TestAddresses.MemberF; + public Member MemberG => TestAddresses.MemberG; - public Member MemberAWeaklyUp { get; } - public Member MemberBWeaklyUp { get; } + public Member MemberAWeaklyUp => TestAddresses.MemberAWeaklyUp; + public Member MemberBWeaklyUp => TestAddresses.MemberBWeaklyUp; - public Member Joining(Member m) => Member.Create(m.UniqueAddress, m.Roles, AppVersion.Zero); + public Member Joining(Member m) => TestAddresses.Joining(m); - public Member Leaving(Member m) => m.Copy(MemberStatus.Leaving); + public Member Leaving(Member m) => TestAddresses.Leaving(m); - public Member Exiting(Member m) => Leaving(m).Copy(MemberStatus.Exiting); + public Member Exiting(Member m) => TestAddresses.Exiting(m); - public Member Downed(Member m) => m.Copy(MemberStatus.Down); + public Member Downed(Member m) => TestAddresses.Downed(m); #endregion TestAddresses @@ -1176,7 +1118,7 @@ public LeaseMajoritySetup(SplitBrainResolverSpec owner, string role = null) public override DowningStrategy CreateStrategy() { - return new LeaseMajority(Role, TestLease, AcquireLeaseDelayForMinority); + return new LeaseMajority(Role, TestLease, AcquireLeaseDelayForMinority, releaseAfter: TimeSpan.FromSeconds(10)); } } @@ -1191,7 +1133,7 @@ public void LeaseMajority_must_decide_AcquireLeaseAndDownUnreachable_and_DownRea var decision1 = strategy1.Decide(); decision1.Should().Be(new AcquireLeaseAndDownUnreachable(TimeSpan.Zero)); strategy1.NodesToDown(decision1).Should().BeEquivalentTo(setup.Side2Nodes); - var reverseDecision1 = strategy1.ReverseDecision(decision1); + var reverseDecision1 = strategy1.ReverseDecision((IAcquireLeaseDecision)decision1); reverseDecision1.Should().BeOfType(); strategy1.NodesToDown(reverseDecision1).Should().BeEquivalentTo(setup.Side1Nodes); @@ -1199,7 +1141,7 @@ public void LeaseMajority_must_decide_AcquireLeaseAndDownUnreachable_and_DownRea var decision2 = strategy2.Decide(); decision2.Should().Be(new AcquireLeaseAndDownUnreachable(setup.AcquireLeaseDelayForMinority)); strategy2.NodesToDown(decision2).Should().BeEquivalentTo(setup.Side1Nodes); - var reverseDecision2 = strategy2.ReverseDecision(decision2); + var reverseDecision2 = strategy2.ReverseDecision((IAcquireLeaseDecision)decision2); reverseDecision2.Should().BeOfType(); strategy2.NodesToDown(reverseDecision2).Should().BeEquivalentTo(setup.Side2Nodes); } @@ -1236,7 +1178,7 @@ public void LeaseMajority_must_down_indirectly_connected_A_B__C___C() var decision1 = strategy1.Decide(); decision1.Should().Be(new AcquireLeaseAndDownIndirectlyConnected(TimeSpan.Zero)); strategy1.NodesToDown(decision1).Should().BeEquivalentTo(new[] { MemberA.UniqueAddress, MemberB.UniqueAddress }); - var reverseDecision1 = strategy1.ReverseDecision(decision1); + var reverseDecision1 = strategy1.ReverseDecision((IAcquireLeaseDecision)decision1); reverseDecision1.Should().BeOfType(); strategy1.NodesToDown(reverseDecision1).Should().BeEquivalentTo(setup.Side1Nodes); } @@ -1258,7 +1200,7 @@ public void LeaseMajority_must_down_indirectly_connected_when_combined_with_clea var decision1 = strategy1.Decide(); decision1.Should().Be(new AcquireLeaseAndDownIndirectlyConnected(TimeSpan.Zero)); strategy1.NodesToDown(decision1).Should().BeEquivalentTo(new[] { MemberB, MemberC, MemberD, MemberE }.Select(m => m.UniqueAddress)); - var reverseDecision1 = strategy1.ReverseDecision(decision1); + var reverseDecision1 = strategy1.ReverseDecision((IAcquireLeaseDecision)decision1); reverseDecision1.Should().BeOfType(); strategy1.NodesToDown(reverseDecision1).Should().BeEquivalentTo(setup.Side1Nodes); @@ -1271,7 +1213,7 @@ public void LeaseMajority_must_down_indirectly_connected_when_combined_with_clea var decision2 = strategy2.Decide(); decision2.Should().Be(new AcquireLeaseAndDownUnreachable(setup.AcquireLeaseDelayForMinority)); strategy2.NodesToDown(decision2).Should().BeEquivalentTo(setup.Side1Nodes); - var reverseDecision2 = strategy2.ReverseDecision(decision2); + var reverseDecision2 = strategy2.ReverseDecision((IAcquireLeaseDecision)decision2); reverseDecision2.Should().BeOfType(); strategy2.NodesToDown(reverseDecision2).Should().BeEquivalentTo(setup.Side2Nodes); @@ -1284,7 +1226,7 @@ public void LeaseMajority_must_down_indirectly_connected_when_combined_with_clea var decision3 = strategy3.Decide(); decision3.Should().Be(new AcquireLeaseAndDownIndirectlyConnected(TimeSpan.Zero)); strategy3.NodesToDown(decision3).Should().BeEquivalentTo(setup.Side1Nodes); - var reverseDecision3 = strategy3.ReverseDecision(decision3); + var reverseDecision3 = strategy3.ReverseDecision((IAcquireLeaseDecision)decision3); reverseDecision3.Should().BeOfType(); strategy3.NodesToDown(reverseDecision3).Should().BeEquivalentTo(new[] { MemberB, MemberC, MemberD, MemberE }.Select(m => m.UniqueAddress)); } @@ -1502,7 +1444,7 @@ public SetupLeaseMajority( : base( owner, stableAfter, - new LeaseMajority(role, testLease, acquireLeaseDelayForMinority: TimeSpan.FromMilliseconds(20)), + new LeaseMajority(role, testLease, acquireLeaseDelayForMinority: TimeSpan.FromMilliseconds(20), releaseAfter: TimeSpan.FromSeconds(10)), selfUniqueAddress, downAllWhenUnstable, tickInterval) diff --git a/src/core/Akka.Cluster.Tests/SBR/TestAddresses.cs b/src/core/Akka.Cluster.Tests/SBR/TestAddresses.cs new file mode 100644 index 00000000000..2039d6b3ead --- /dev/null +++ b/src/core/Akka.Cluster.Tests/SBR/TestAddresses.cs @@ -0,0 +1,91 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Util; + +namespace Akka.Cluster.Tests.SBR +{ + public static class TestAddresses + { + public static Member MemberA { get; } + public static Member MemberB { get; } + public static Member MemberC { get; } + public static Member MemberD { get; } + public static Member MemberE { get; } + public static Member MemberF { get; } + public static Member MemberG { get; } + + public static Member MemberAWeaklyUp { get; } + public static Member MemberBWeaklyUp { get; } + + static TestAddresses() + { + var addressA = new Address("akka.tcp", "sys", "a", 2552); + MemberA = + new Member( + new UniqueAddress(addressA, 0), + 5, + MemberStatus.Up, + new[] { "role3" }, + AppVersion.Zero); + MemberB = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "b", addressA.Port), 0), + 4, + MemberStatus.Up, + new[] { "role1", "role3" }, + AppVersion.Zero); + MemberC = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "c", addressA.Port), 0), + 3, + MemberStatus.Up, + new[] { "role2" }, + AppVersion.Zero); + MemberD = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "d", addressA.Port), 0), + 2, + MemberStatus.Up, + new[] { "role1", "role2", "role3" }, + AppVersion.Zero); + MemberE = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "e", addressA.Port), 0), + 1, + MemberStatus.Up, + new string[] { }, + AppVersion.Zero); + MemberF = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "f", addressA.Port), 0), + 5, + MemberStatus.Up, + new string[] { }, + AppVersion.Zero); + MemberG = + new Member( + new UniqueAddress(new Address(addressA.Protocol, addressA.System, "g", addressA.Port), 0), + 6, + MemberStatus.Up, + new string[] { }, + AppVersion.Zero); + + MemberAWeaklyUp = new Member(MemberA.UniqueAddress, int.MaxValue, MemberStatus.WeaklyUp, MemberA.Roles, AppVersion.Zero); + MemberBWeaklyUp = new Member(MemberB.UniqueAddress, int.MaxValue, MemberStatus.WeaklyUp, MemberB.Roles, AppVersion.Zero); + } + + public static Member Joining(Member m) => Member.Create(m.UniqueAddress, m.Roles, AppVersion.Zero); + + public static Member Leaving(Member m) => m.Copy(MemberStatus.Leaving); + + public static Member Exiting(Member m) => Leaving(m).Copy(MemberStatus.Exiting); + + public static Member Downed(Member m) => m.Copy(MemberStatus.Down); + } +} diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 14c93b5f088..f8aabea0d24 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -1964,7 +1964,7 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) { _cluster.FailureDetector.Remove(node.Address); } - + } _log.Debug("Cluster Node [{0}] - Receiving gossip from [{1}]", _cluster.SelfAddress, from); @@ -2085,7 +2085,7 @@ public void SendGossip() { // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older gossip version - preferredGossipTarget = ImmutableList.CreateRange(localGossip.Members.Where(m => !localGossip.SeenByNode(m.UniqueAddress) + preferredGossipTarget = ImmutableList.CreateRange(localGossip.Members.Where(m => !localGossip.SeenByNode(m.UniqueAddress) && _membershipState.ValidNodeForGossip(m.UniqueAddress)).Select(m => m.UniqueAddress)); } else @@ -2639,7 +2639,7 @@ public void AssertLatestGossip() public void PublishMembershipState() { if (_cluster.Settings.VerboseGossipReceivedLogging) - _log.Debug("Cluster Node [{0}] - New gossip published [{0}]", SelfUniqueAddress, _membershipState.LatestGossip); + _log.Debug("Cluster Node [{0}] - New gossip published [{1}]", SelfUniqueAddress, _membershipState.LatestGossip); _publisher.Tell(new InternalClusterAction.PublishChanges(_membershipState)); if (_cluster.Settings.PublishStatsInterval == TimeSpan.Zero) diff --git a/src/core/Akka.Cluster/Configuration/Cluster.conf b/src/core/Akka.Cluster/Configuration/Cluster.conf index 96b9fa3ddd3..ad7557168fa 100644 --- a/src/core/Akka.Cluster/Configuration/Cluster.conf +++ b/src/core/Akka.Cluster/Configuration/Cluster.conf @@ -393,6 +393,9 @@ akka.cluster.split-brain-resolver.lease-majority { # as an best effort to try to keep the majority side. acquire-lease-delay-for-minority = 2s + # Release the lease after this duration. + release-after = 40s + # If the 'role' is defined the majority/minority is based only on members with that 'role'. role = "" } diff --git a/src/core/Akka.Cluster/SBR/DowningStrategy.cs b/src/core/Akka.Cluster/SBR/DowningStrategy.cs index 2eb64b472f5..2509e328e8a 100644 --- a/src/core/Akka.Cluster/SBR/DowningStrategy.cs +++ b/src/core/Akka.Cluster/SBR/DowningStrategy.cs @@ -417,24 +417,14 @@ public ImmutableHashSet NodesToDown(IDecision decision = null) throw new InvalidOperationException(); } - public IDecision ReverseDecision(IDecision decision) + public IDecision ReverseDecision(IAcquireLeaseDecision decision) { switch (decision) { - case DownUnreachable _: - return DownReachable.Instance; case AcquireLeaseAndDownUnreachable _: return DownReachable.Instance; - case DownReachable _: - return DownUnreachable.Instance; - case DownAll _: - return DownAll.Instance; - case DownIndirectlyConnected _: - return ReverseDownIndirectlyConnected.Instance; case AcquireLeaseAndDownIndirectlyConnected _: return ReverseDownIndirectlyConnected.Instance; - case ReverseDownIndirectlyConnected _: - return DownIndirectlyConnected.Instance; } throw new InvalidOperationException(); @@ -684,14 +674,16 @@ public override IDecision Decide() /// internal class LeaseMajority : DowningStrategy { - public LeaseMajority(string role, Lease lease, TimeSpan acquireLeaseDelayForMinority) + public LeaseMajority(string role, Lease lease, TimeSpan acquireLeaseDelayForMinority, TimeSpan releaseAfter) { Role = role; Lease = lease; AcquireLeaseDelayForMinority = acquireLeaseDelayForMinority; + ReleaseAfter = releaseAfter; } public TimeSpan AcquireLeaseDelayForMinority { get; } + public TimeSpan ReleaseAfter { get; } private TimeSpan AcquireLeaseDelay { diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs index 611b4c8e036..feb711f139c 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolver.cs @@ -74,8 +74,6 @@ public override void Down(UniqueAddress node, IDecision decision) /// internal abstract class SplitBrainResolverBase : ActorBase, IWithUnboundedStash, IWithTimers { - private readonly TimeSpan releaseLeaseAfter; - // would be better as constructor parameter, but don't want to break Cinnamon instrumentation private readonly SplitBrainResolverSettings settings; private ILoggingAdapter _log; @@ -95,7 +93,6 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) Strategy = strategy; settings = new SplitBrainResolverSettings(Context.System.Settings.Config); - releaseLeaseAfter = stableAfter + stableAfter; // ReSharper disable once VirtualMemberCallInConstructor Timers.StartPeriodicTimer(Tick.Instance, Tick.Instance, TickInterval); @@ -103,6 +100,8 @@ protected SplitBrainResolverBase(TimeSpan stableAfter, DowningStrategy strategy) ResetStableDeadline(); } + private TimeSpan ReleaseLeaseAfter => (Strategy is LeaseMajority lm) ? lm.ReleaseAfter : throw new InvalidOperationException($"Unexpected use of releaseLeaseAfter for strategy [{Strategy?.GetType().Name}]"); + public TimeSpan StableAfter { get; } public DowningStrategy Strategy { get; } @@ -374,7 +373,7 @@ private void OnAcquireLease() { Log.Debug("SBR trying to acquire lease"); //implicit val ec: ExecutionContext = internalDispatcher - + Strategy.Lease?.Acquire().ContinueWith(r => { if (r.IsFaulted) @@ -384,7 +383,7 @@ private void OnAcquireLease() .PipeTo(Self); } - public Receive WaitingForLease(IDecision decision) + public Receive WaitingForLease(IAcquireLeaseDecision decision) { bool ReceiveLease(object message) { @@ -408,7 +407,7 @@ bool ReceiveLease(object message) default: if (downedNodes.IsEmpty) releaseLeaseCondition = - new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + releaseLeaseAfter); + new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(downedNodes); @@ -427,7 +426,7 @@ bool ReceiveLease(object message) } Stash.UnstashAll(); - Context.Become(ReceiveLease); + Context.Become(Receive); return true; case ReleaseLeaseResult lr: @@ -642,7 +641,7 @@ public void Remove(Member m) if (remainingDownedNodes.IsEmpty) releaseLeaseCondition = - new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + releaseLeaseAfter); + new ReleaseLeaseCondition.WhenTimeElapsed(Deadline.Now + ReleaseLeaseAfter); else releaseLeaseCondition = new ReleaseLeaseCondition.WhenMembersRemoved(remainingDownedNodes); diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs index 91fa80a64cd..4ad2253249d 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolverProvider.cs @@ -73,7 +73,7 @@ public Props DowningActorProps var leaseName = lms.SafeLeaseName(system.Name); var lease = LeaseProvider.Get(system).GetLease(leaseName, lms.LeaseImplementation, leaseOwnerName); - strategy = new LeaseMajority(lms.Role, lease, lms.AcquireLeaseDelayForMinority); + strategy = new LeaseMajority(lms.Role, lease, lms.AcquireLeaseDelayForMinority, lms.ReleaseAfter); break; default: throw new InvalidOperationException(); diff --git a/src/core/Akka.Cluster/SBR/SplitBrainResolverSettings.cs b/src/core/Akka.Cluster/SBR/SplitBrainResolverSettings.cs index 127550f49e7..1b8fc1880cf 100644 --- a/src/core/Akka.Cluster/SBR/SplitBrainResolverSettings.cs +++ b/src/core/Akka.Cluster/SBR/SplitBrainResolverSettings.cs @@ -117,7 +117,9 @@ string Role(Config c) if (string.IsNullOrEmpty(leaseName)) leaseName = null; - return new LeaseMajoritySettings(leaseImplementation, acquireLeaseDelayForMinority, Role(c), leaseName); + var releaseAfter = c.GetTimeSpan("release-after"); + + return new LeaseMajoritySettings(leaseImplementation, acquireLeaseDelayForMinority, releaseAfter, Role(c), leaseName); }); } @@ -164,15 +166,23 @@ public KeepOldestSettings(bool downIfAlone, string role) public sealed class LeaseMajoritySettings { + [Obsolete] public LeaseMajoritySettings(string leaseImplementation, TimeSpan acquireLeaseDelayForMinority, string role) : this(leaseImplementation, acquireLeaseDelayForMinority, role, null) { } + [Obsolete] public LeaseMajoritySettings(string leaseImplementation, TimeSpan acquireLeaseDelayForMinority, string role, string leaseName) + : this(leaseImplementation, acquireLeaseDelayForMinority, TimeSpan.FromSeconds(40), role, leaseName) + { + } + + public LeaseMajoritySettings(string leaseImplementation, TimeSpan acquireLeaseDelayForMinority, TimeSpan releaseAfter, string role, string leaseName) { LeaseImplementation = leaseImplementation; AcquireLeaseDelayForMinority = acquireLeaseDelayForMinority; + ReleaseAfter = releaseAfter; Role = role; LeaseName = leaseName; } @@ -186,8 +196,10 @@ public string SafeLeaseName(string systemName) public TimeSpan AcquireLeaseDelayForMinority { get; } + public TimeSpan ReleaseAfter { get; } + public string Role { get; } - + public string LeaseName { get; } } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/TestLease.cs b/src/core/Akka.Coordination.Tests/TestLease.cs similarity index 95% rename from src/contrib/cluster/Akka.Cluster.Tools.Tests/TestLease.cs rename to src/core/Akka.Coordination.Tests/TestLease.cs index 929d162fc4b..5c0c13a0223 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/TestLease.cs +++ b/src/core/Akka.Coordination.Tests/TestLease.cs @@ -7,20 +7,56 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; -using Akka.Coordination; using Akka.Event; using Akka.TestKit; using Akka.TestKit.Xunit2; using Akka.Util; -namespace Akka.Cluster.Tools.Tests +namespace Akka.Coordination.Tests { + public class TestLeaseExtExtensionProvider : ExtensionIdProvider + { + public override TestLeaseExt CreateExtension(ExtendedActorSystem system) + { + var extension = new TestLeaseExt(system); + return extension; + } + } + + public class TestLeaseExt : IExtension + { + public static TestLeaseExt Get(ActorSystem system) + { + return system.WithExtension(); + } + + private readonly ExtendedActorSystem _system; + private readonly ConcurrentDictionary testLeases = new ConcurrentDictionary(); + + public TestLeaseExt(ExtendedActorSystem system) + { + _system = system; + _system.Settings.InjectTopLevelFallback(LeaseProvider.DefaultConfig()); + } + + public TestLease GetTestLease(string name) + { + if (!testLeases.TryGetValue(name, out var lease)) + { + throw new InvalidOperationException($"Test lease {name} has not been set yet. Current leases {string.Join(",", testLeases.Keys)}"); + } + return lease; + } + + public void SetTestLease(string name, TestLease lease) + { + testLeases[name] = lease; + } + } + public class TestLease : Lease { public sealed class AcquireReq : IEquatable @@ -75,7 +111,7 @@ public static Config Configuration { get { return ConfigurationFactory.ParseString(@" test-lease { - lease-class = ""Akka.Cluster.Tools.Tests.TestLease, Akka.Cluster.Tools.Tests"" + lease-class = ""Akka.Coordination.Tests.TestLease, Akka.Coordination.Tests"" } "); } } @@ -128,44 +164,4 @@ public override Task Acquire(Action leaseLostCallback) return Acquire(); } } - - public class TestLeaseExtExtensionProvider : ExtensionIdProvider - { - public override TestLeaseExt CreateExtension(ExtendedActorSystem system) - { - var extension = new TestLeaseExt(system); - return extension; - } - } - - public class TestLeaseExt : IExtension - { - public static TestLeaseExt Get(ActorSystem system) - { - return system.WithExtension(); - } - - private readonly ExtendedActorSystem _system; - private readonly ConcurrentDictionary testLeases = new ConcurrentDictionary(); - - public TestLeaseExt(ExtendedActorSystem system) - { - _system = system; - _system.Settings.InjectTopLevelFallback(LeaseProvider.DefaultConfig()); - } - - public TestLease GetTestLease(string name) - { - if (!testLeases.TryGetValue(name, out var lease)) - { - throw new InvalidOperationException($"Test lease {name} has not been set yet. Current leases {string.Join(",", testLeases.Keys)}"); - } - return lease; - } - - public void SetTestLease(string name, TestLease lease) - { - testLeases[name] = lease; - } - } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs b/src/core/Akka.Coordination.Tests/TestLeaseActor.cs similarity index 86% rename from src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs rename to src/core/Akka.Coordination.Tests/TestLeaseActor.cs index 65d2fb84758..f06845e92ee 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/TestLease.cs +++ b/src/core/Akka.Coordination.Tests/TestLeaseActor.cs @@ -15,15 +15,15 @@ using Akka.Event; using Akka.Util; -namespace Akka.Cluster.Tools.Tests.MultiNode +namespace Akka.Coordination.Tests { - internal class TestLeaseActor : ActorBase + public class TestLeaseActor : ActorBase { - internal interface ILeaseRequest + public interface ILeaseRequest { } - internal sealed class Acquire : ILeaseRequest, IEquatable + public sealed class Acquire : ILeaseRequest, IEquatable { public string Owner { get; } @@ -47,7 +47,7 @@ public bool Equals(Acquire other) public override string ToString() => $"Acquire({Owner})"; } - internal sealed class Release : ILeaseRequest, IEquatable + public sealed class Release : ILeaseRequest, IEquatable { public string Owner { get; } @@ -71,7 +71,7 @@ public bool Equals(Release other) public override string ToString() => $"Release({Owner})"; } - internal sealed class Create : ILeaseRequest, IEquatable + public sealed class Create : ILeaseRequest, IEquatable { public string LeaseName { get; } public string OwnerName { get; } @@ -105,7 +105,7 @@ public override int GetHashCode() public override string ToString() => $"Create({LeaseName}, {OwnerName})"; } - internal sealed class GetRequests + public sealed class GetRequests { public static readonly GetRequests Instance = new GetRequests(); private GetRequests() @@ -113,7 +113,7 @@ private GetRequests() } } - internal sealed class LeaseRequests + public sealed class LeaseRequests { public List Requests { get; } @@ -126,7 +126,7 @@ public LeaseRequests(List requests) } - internal sealed class ActionRequest // boolean of Failure + public sealed class ActionRequest // boolean of Failure { public ILeaseRequest Request { get; } public bool Result { get; } @@ -143,7 +143,7 @@ public ActionRequest(ILeaseRequest request, bool result) public static Props Props => Props.Create(() => new TestLeaseActor()); private ILoggingAdapter _log = Context.GetLogger(); - private readonly List<(IActorRef, ILeaseRequest)> requests = new List<(IActorRef, ILeaseRequest)>(); + private readonly List<(IActorRef, ILeaseRequest)> _requests = new List<(IActorRef, ILeaseRequest)>(); public TestLeaseActor() { @@ -159,23 +159,23 @@ protected override bool Receive(object message) case ILeaseRequest request: _log.Info("Lease request {0} from {1}", request, Sender); - requests.Insert(0, (Sender, request)); + _requests.Insert(0, (Sender, request)); return true; case GetRequests _: - Sender.Tell(new LeaseRequests(requests.Select(i => i.Item2).ToList())); + Sender.Tell(new LeaseRequests(_requests.Select(i => i.Item2).ToList())); return true; case ActionRequest ar: - var r = requests.FirstOrDefault(i => i.Item2.Equals(ar.Request)); + var r = _requests.FirstOrDefault(i => i.Item2.Equals(ar.Request)); if (r.Item1 != null) { _log.Info("Actioning request {0} to {1}", r.Item2, ar.Result); r.Item1.Tell(ar.Result); - requests.RemoveAll(i => i.Item2.Equals(ar.Request)); + _requests.RemoveAll(i => i.Item2.Equals(ar.Request)); } else - throw new InvalidOperationException($"unknown request to action: {ar.Request}. Requests: { string.Join(", ", requests.Select(i => $"([{i.Item1}],[{i.Item2}])"))}"); + throw new InvalidOperationException($"unknown request to action: {ar.Request}. Requests: { string.Join(", ", _requests.Select(i => $"([{i.Item1}],[{i.Item2}])"))}"); return true; } return false; @@ -208,7 +208,7 @@ public TestLeaseActorClientExt(ExtendedActorSystem system) _system = system; } - internal IActorRef GetLeaseActor() + public IActorRef GetLeaseActor() { var lease = leaseActor.Value; if (lease == null) @@ -216,13 +216,13 @@ internal IActorRef GetLeaseActor() return lease; } - internal void SetActorLease(IActorRef client) + public void SetActorLease(IActorRef client) { leaseActor.GetAndSet(client); } } - internal class TestLeaseActorClient : Lease + public class TestLeaseActorClient : Lease { private ILoggingAdapter _log;