Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] possible fix for #2492 #2498

Merged
merged 3 commits into from
Feb 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using System;
using Akka.Util.Internal.Collections;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -202,24 +202,5 @@ public void LeaderOrdering_must_order_members_with_status_joining_exiting_down_l
shuffled.Sort(Member.LeaderStatusOrdering).Should().BeEquivalentTo(expected);
}
}

static class ListExtensions
{
public static List<T> Shuffle<T>(this List<T> @this)
{
var list = new List<T>(@this);
var rng = new Random();
var n = list.Count;
while (n > 1)
{
n--;
var k = rng.Next(n + 1);
var value = list[k];
list[k] = list[n];
list[n] = value;
}
return list;
}
}
}

59 changes: 39 additions & 20 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.Remote;
using Akka.Util;
using Akka.Util.Internal;
using Akka.Util.Internal.Collections;

namespace Akka.Cluster
{
Expand Down Expand Up @@ -935,7 +936,7 @@ internal class ClusterCoreDaemon : UntypedActor, IRequiresMessageQueue<IUnbounde
/// TBD
/// </summary>
protected readonly UniqueAddress SelfUniqueAddress;
private const int NumberOfGossipsBeforeShutdownWhenLeaderExits = 3;
private const int NumberOfGossipsBeforeShutdownWhenLeaderExits = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the purpose of increasing this value? Not questioning the decision to do it, but just want to understand the reason for the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I get it now - increases the number of random nodes we might gossip to when the leader exits.

private const int MaxGossipsBeforeShuttingDownMyself = 5;

private readonly VectorClock.Node _vclockNode;
Expand Down Expand Up @@ -1125,6 +1126,7 @@ private void TryingToJoin(object message, Address joinWith, Deadline deadline)
else if (message is InternalClusterAction.JoinSeedNodes)
{
var js = message as InternalClusterAction.JoinSeedNodes;
BecomeUninitialized();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

JoinSeedNodes(js.SeedNodes);
}
else if (message is InternalClusterAction.ISubscriptionMessage)
Expand Down Expand Up @@ -1534,6 +1536,8 @@ public void Leaving(Address address)

_log.Info("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving);
Publish(_latestGossip);
// immediate gossip to speed up the leaving process
SendGossip();
}
}

Expand Down Expand Up @@ -1734,6 +1738,11 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope)
gossipType = ReceiveGossipType.Newer;
break;
default:
// conflicting versions, merge
// We can see that a removal was done when it is not in one of the gossips has status
// Down or Exiting in the other gossip.
// Perform the same pruning (clear of VectorClock) as the leader did when removing a member.
// Removal of member itself is handled in merge (pickHighestPriority)
var prunedLocalGossip = localGossip.Members.Aggregate(localGossip, (g, m) =>
{
if (Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status) && !remoteGossip.Members.Contains(m))
Expand Down Expand Up @@ -1845,6 +1854,24 @@ public bool IsGossipSpeedupNeeded()
return _latestGossip.Overview.Seen.Count < _latestGossip.Members.Count / 2;
}


/// <summary>
/// Sends full gossip to `n` other random members.
/// </summary>
private void SendGossipRandom(int n)
{
if (!IsSingletonCluster && n > 0)
{
var localGossip = _latestGossip;
var possibleTargets =
localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress))
.Select(m => m.UniqueAddress)
.ToList();
var randomTargets = possibleTargets.Count <= n ? possibleTargets : possibleTargets.Shuffle().Slice(0, n);
randomTargets.ForEach(GossipTo);
}
}

/// <summary>
/// Initiates a new round of gossip.
/// </summary>
Expand Down Expand Up @@ -1910,15 +1937,12 @@ public double AdjustedGossipDifferentViewProbability
// don't go lower than 1/10 of the configured GossipDifferentViewProbability
var minP = _cluster.Settings.GossipDifferentViewProbability / 10;
if (size >= high) return minP;
else
{
// linear reduction of the probability with increasing number of nodes
// from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes
// to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes
// i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes
var k = (minP - _cluster.Settings.GossipDifferentViewProbability) / (high - low);
return _cluster.Settings.GossipDifferentViewProbability + (size - low) * k;
}
// linear reduction of the probability with increasing number of nodes
// from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes
// to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes
// i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes
var k = (minP - _cluster.Settings.GossipDifferentViewProbability) / (high - low);
return _cluster.Settings.GossipDifferentViewProbability + (size - low) * k;
}
}

Expand Down Expand Up @@ -1972,11 +1996,9 @@ private void ShutdownSelfWhenDown()
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
_log.Info("Shutting down myself");
downed
.Where(n => !unreachable.Contains(n) || n == SelfUniqueAddress)
.Take(MaxGossipsBeforeShuttingDownMyself)
.ForEach(GossipTo);

// not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed
// if other downed know that this node has seen the version
SendGossipRandom(MaxGossipsBeforeShuttingDownMyself);
Shutdown();
}
}
Expand Down Expand Up @@ -2105,10 +2127,7 @@ public void LeaderActionsOnConvergence()
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
for (var i = 1; i <= NumberOfGossipsBeforeShutdownWhenLeaderExits; i++)
{
SendGossip();
}
SendGossipRandom(NumberOfGossipsBeforeShutdownWhenLeaderExits);
Shutdown();
}
}
Expand Down Expand Up @@ -2164,7 +2183,7 @@ public void ReapUnreachableMembers()
_cluster.SelfAddress, nonExiting.Select(m => m.ToString()).Aggregate((a, b) => a + ", " + b), string.Join(",", _cluster.SelfRoles));

if (!exiting.IsEmpty)
_log.Warning("Marking exiting node(s) as UNREACHABLE [{0}]. This is expected and they will be removed.",
_log.Warning("Cluster Node [{0}] - Marking exiting node(s) as UNREACHABLE [{1}]. This is expected and they will be removed.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Can we do this one as INFO if it is a normal exit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxcherednik @maxim-s yep, I think that's a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Akka (JVM) does not have "Cluster Node" parameter in this log entry. Maybe we could just remove the parameter, instead of extending the log message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine

_cluster.SelfAddress, exiting.Select(m => m.ToString()).Aggregate((a, b) => a + ", " + b));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simplify this code and use something like this

_log.Info("Marking exiting node(s) as UNREACHABLE [{0}]. This is expected and they will be removed.",  string.Join(", ", exiting));


if (!newlyDetectedReachableMembers.IsEmpty)
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@
<Compile Include="Util\Internal\AtomicState.cs" />
<Compile Include="Util\Internal\Collections\EnumeratorExtensions.cs" />
<Compile Include="Util\Internal\Collections\Iterator.cs" />
<Compile Include="Util\Internal\Collections\ListExtensions.cs" />
<Compile Include="Util\Internal\DictionaryExtensions.cs" />
<Compile Include="Util\Internal\ImmutabilityUtils.cs" />
<Compile Include="Util\Internal\InterlockedSpin.cs" />
Expand Down
30 changes: 30 additions & 0 deletions src/core/Akka/Util/Internal/Collections/ListExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//-----------------------------------------------------------------------
// <copyright file="ListExtensions.cs" company="Akka.NET Project">
// Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2017 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Akka.Util.Internal.Collections
{
internal static class ListExtensions
{
public static List<T> Shuffle<T>(this List<T> @this)
{
var list = new List<T>(@this);
var r = ThreadLocalRandom.Current;
for (int i = list.Count - 1; i > 0; i--)
{
int index = r.Next(i);
//swap
var tmp = list[index];
list[index] = list[i];
list[i] = tmp;
}
return list;
}
}
}