From 94ee8e9d03e435cd9d5ab3c78789bae573eb92de Mon Sep 17 00:00:00 2001 From: Maxim Salamatko Date: Thu, 2 Feb 2017 09:34:38 +0400 Subject: [PATCH 1/3] possible fix for #2492 --- src/core/Akka.Cluster/ClusterDaemon.cs | 27 +++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index a54efeb9e34..9bb00018e30 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -935,7 +935,7 @@ internal class ClusterCoreDaemon : UntypedActor, IRequiresMessageQueue protected readonly UniqueAddress SelfUniqueAddress; - private const int NumberOfGossipsBeforeShutdownWhenLeaderExits = 3; + private const int NumberOfGossipsBeforeShutdownWhenLeaderExits = 5; private const int MaxGossipsBeforeShuttingDownMyself = 5; private readonly VectorClock.Node _vclockNode; @@ -1125,6 +1125,7 @@ private void TryingToJoin(object message, Address joinWith, Deadline deadline) else if (message is InternalClusterAction.JoinSeedNodes) { var js = message as InternalClusterAction.JoinSeedNodes; + BecomeUninitialized(); JoinSeedNodes(js.SeedNodes); } else if (message is InternalClusterAction.ISubscriptionMessage) @@ -1534,6 +1535,8 @@ public void Leaving(Address address) _log.Info("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving); Publish(_latestGossip); + // immediate gossip to speed up the leaving process + SendGossip(); } } @@ -1734,6 +1737,11 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) gossipType = ReceiveGossipType.Newer; break; default: + // conflicting versions, merge + // We can see that a removal was done when it is not in one of the gossips has status + // Down or Exiting in the other gossip. + // Perform the same pruning (clear of VectorClock) as the leader did when removing a member. + // Removal of member itself is handled in merge (pickHighestPriority) var prunedLocalGossip = localGossip.Members.Aggregate(localGossip, (g, m) => { if (Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status) && !remoteGossip.Members.Contains(m)) @@ -1910,15 +1918,12 @@ public double AdjustedGossipDifferentViewProbability // don't go lower than 1/10 of the configured GossipDifferentViewProbability var minP = _cluster.Settings.GossipDifferentViewProbability / 10; if (size >= high) return minP; - else - { - // linear reduction of the probability with increasing number of nodes - // from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes - // to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes - // i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes - var k = (minP - _cluster.Settings.GossipDifferentViewProbability) / (high - low); - return _cluster.Settings.GossipDifferentViewProbability + (size - low) * k; - } + // linear reduction of the probability with increasing number of nodes + // from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes + // to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes + // i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes + var k = (minP - _cluster.Settings.GossipDifferentViewProbability) / (high - low); + return _cluster.Settings.GossipDifferentViewProbability + (size - low) * k; } } @@ -2164,7 +2169,7 @@ public void ReapUnreachableMembers() _cluster.SelfAddress, nonExiting.Select(m => m.ToString()).Aggregate((a, b) => a + ", " + b), string.Join(",", _cluster.SelfRoles)); if (!exiting.IsEmpty) - _log.Warning("Marking exiting node(s) as UNREACHABLE [{0}]. This is expected and they will be removed.", + _log.Warning("Cluster Node [{0}] - Marking exiting node(s) as UNREACHABLE [{1}]. This is expected and they will be removed.", _cluster.SelfAddress, exiting.Select(m => m.ToString()).Aggregate((a, b) => a + ", " + b)); if (!newlyDetectedReachableMembers.IsEmpty) From 3efe1f3073b0427b3ceaec2536364af7f5589f7c Mon Sep 17 00:00:00 2001 From: Maxim Salamatko Date: Thu, 2 Feb 2017 19:19:22 +0400 Subject: [PATCH 2/3] added random gossip --- .../CoreAPISpec.ApproveCore.approved.txt | 4 +++ .../Akka.Cluster.Tests/MemberOrderingSpec.cs | 21 +------------ src/core/Akka.Cluster/ClusterDaemon.cs | 29 +++++++++++------ src/core/Akka/Akka.csproj | 1 + .../Internal/Collections/ListExtensions.cs | 31 +++++++++++++++++++ 5 files changed, 57 insertions(+), 29 deletions(-) create mode 100644 src/core/Akka/Util/Internal/Collections/ListExtensions.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 707925068dc..b1a8d9ad418 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -5145,6 +5145,10 @@ namespace Akka.Util.Internal.Collections public T Next() { } public System.Collections.Generic.IEnumerable ToVector() { } } + public class static ListExtensions + { + public static System.Collections.Generic.List Shuffle(this System.Collections.Generic.List @this) { } + } } namespace Akka.Util.Reflection { diff --git a/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs b/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs index aa28a18b764..9cd610884a4 100644 --- a/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs +++ b/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs @@ -9,7 +9,7 @@ using System.Collections.Immutable; using System.Linq; using Akka.Actor; -using System; +using Akka.Util.Internal.Collections; using FluentAssertions; using Xunit; @@ -202,24 +202,5 @@ public void LeaderOrdering_must_order_members_with_status_joining_exiting_down_l shuffled.Sort(Member.LeaderStatusOrdering).Should().BeEquivalentTo(expected); } } - - static class ListExtensions - { - public static List Shuffle(this List @this) - { - var list = new List(@this); - var rng = new Random(); - var n = list.Count; - while (n > 1) - { - n--; - var k = rng.Next(n + 1); - var value = list[k]; - list[k] = list[n]; - list[n] = value; - } - return list; - } - } } diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 9bb00018e30..ddcfbd3a520 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -16,6 +16,7 @@ using Akka.Remote; using Akka.Util; using Akka.Util.Internal; +using Akka.Util.Internal.Collections; namespace Akka.Cluster { @@ -1853,6 +1854,21 @@ public bool IsGossipSpeedupNeeded() return _latestGossip.Overview.Seen.Count < _latestGossip.Members.Count / 2; } + + /// + /// Sends full gossip to `n` other random members. + /// + private void SendGossipRandom(int n) + { + if (!IsSingletonCluster && n > 0) + { + var localGossip = _latestGossip; + var possibleTargets = new List(localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress)).Select(m => m.UniqueAddress)); + var randomTargets = possibleTargets.Count <= n ? possibleTargets : possibleTargets.Shuffle().Slice(0, n); + randomTargets.ForEach(GossipTo); + } + } + /// /// Initiates a new round of gossip. /// @@ -1977,11 +1993,9 @@ private void ShutdownSelfWhenDown() // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves _log.Info("Shutting down myself"); - downed - .Where(n => !unreachable.Contains(n) || n == SelfUniqueAddress) - .Take(MaxGossipsBeforeShuttingDownMyself) - .ForEach(GossipTo); - + // not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed + // if other downed know that this node has seen the version + SendGossipRandom(MaxGossipsBeforeShuttingDownMyself); Shutdown(); } } @@ -2110,10 +2124,7 @@ public void LeaderActionsOnConvergence() // for downing. However, if those final gossip messages never arrive it is // alright to require the downing, because that is probably caused by a // network failure anyway. - for (var i = 1; i <= NumberOfGossipsBeforeShutdownWhenLeaderExits; i++) - { - SendGossip(); - } + SendGossipRandom(NumberOfGossipsBeforeShutdownWhenLeaderExits); Shutdown(); } } diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index e4e7d515a06..7fff3de1cea 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -277,6 +277,7 @@ + diff --git a/src/core/Akka/Util/Internal/Collections/ListExtensions.cs b/src/core/Akka/Util/Internal/Collections/ListExtensions.cs new file mode 100644 index 00000000000..8ab9f6dc9a4 --- /dev/null +++ b/src/core/Akka/Util/Internal/Collections/ListExtensions.cs @@ -0,0 +1,31 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2017 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; + +namespace Akka.Util.Internal.Collections +{ + public static class ListExtensions + { + public static List Shuffle(this List @this) + { + var list = new List(@this); + var rng = new Random(); + var n = list.Count; + while (n > 1) + { + n--; + var k = rng.Next(n + 1); + var value = list[k]; + list[k] = list[n]; + list[n] = value; + } + return list; + } + } +} \ No newline at end of file From d9e89f6307cc1ddfc16f1eac078217c1959e074c Mon Sep 17 00:00:00 2001 From: Maxim Salamatko Date: Mon, 6 Feb 2017 09:44:40 +0400 Subject: [PATCH 3/3] cleanup --- .../CoreAPISpec.ApproveCore.approved.txt | 4 ---- src/core/Akka.Cluster/ClusterDaemon.cs | 5 ++++- .../Util/Internal/Collections/ListExtensions.cs | 17 ++++++++--------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index b1a8d9ad418..707925068dc 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -5145,10 +5145,6 @@ namespace Akka.Util.Internal.Collections public T Next() { } public System.Collections.Generic.IEnumerable ToVector() { } } - public class static ListExtensions - { - public static System.Collections.Generic.List Shuffle(this System.Collections.Generic.List @this) { } - } } namespace Akka.Util.Reflection { diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index ddcfbd3a520..7fc5b09595a 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -1863,7 +1863,10 @@ private void SendGossipRandom(int n) if (!IsSingletonCluster && n > 0) { var localGossip = _latestGossip; - var possibleTargets = new List(localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress)).Select(m => m.UniqueAddress)); + var possibleTargets = + localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress)) + .Select(m => m.UniqueAddress) + .ToList(); var randomTargets = possibleTargets.Count <= n ? possibleTargets : possibleTargets.Shuffle().Slice(0, n); randomTargets.ForEach(GossipTo); } diff --git a/src/core/Akka/Util/Internal/Collections/ListExtensions.cs b/src/core/Akka/Util/Internal/Collections/ListExtensions.cs index 8ab9f6dc9a4..55fc6136779 100644 --- a/src/core/Akka/Util/Internal/Collections/ListExtensions.cs +++ b/src/core/Akka/Util/Internal/Collections/ListExtensions.cs @@ -10,20 +10,19 @@ namespace Akka.Util.Internal.Collections { - public static class ListExtensions + internal static class ListExtensions { public static List Shuffle(this List @this) { var list = new List(@this); - var rng = new Random(); - var n = list.Count; - while (n > 1) + var r = ThreadLocalRandom.Current; + for (int i = list.Count - 1; i > 0; i--) { - n--; - var k = rng.Next(n + 1); - var value = list[k]; - list[k] = list[n]; - list[n] = value; + int index = r.Next(i); + //swap + var tmp = list[index]; + list[index] = list[i]; + list[i] = tmp; } return list; }