Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Member status: Weakly up #3099

Merged
merged 7 commits into from
Dec 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
if (IsMatchingRole(up.Member)) _nodes.Add(up.Member.Address);
});
Receive<ClusterEvent.MemberWeaklyUp>(weaklyUp =>
{
if (IsMatchingRole(weaklyUp.Member)) _nodes.Add(weaklyUp.Member.Address);
});
Receive<ClusterEvent.MemberLeft>(left =>
{
if (IsMatchingRole(left.Member))
Expand Down
9 changes: 9 additions & 0 deletions src/contrib/cluster/Akka.DistributedData/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ private void NormalReceive()
Receive<Subscribe>(s => ReceiveSubscribe(s.Key, s.Subscriber));
Receive<Unsubscribe>(u => ReceiveUnsubscribe(u.Key, u.Subscriber));
Receive<Terminated>(t => ReceiveTerminated(t.ActorRef));
Receive<ClusterEvent.MemberWeaklyUp>(m => ReceiveMemberWeaklyUp(m.Member));
Receive<ClusterEvent.MemberUp>(m => ReceiveMemberUp(m.Member));
Receive<ClusterEvent.MemberRemoved>(m => ReceiveMemberRemoved(m.Member));
Receive<ClusterEvent.IMemberEvent>(_ => { });
Expand Down Expand Up @@ -1193,6 +1194,14 @@ private void ReceiveTerminated(IActorRef terminated)
}
}

private void ReceiveMemberWeaklyUp(Member m)
{
if (MatchingRole(m) && m.Address != _selfAddress)
{
_weaklyUpNodes = _weaklyUpNodes.Add(m.Address);
}
}

private void ReceiveMemberUp(Member m)
{
if (MatchingRole(m) && m.Address != _selfAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ namespace Akka.Cluster
{
public MemberUp(Akka.Cluster.Member member) { }
}
public sealed class MemberWeaklyUp : Akka.Cluster.ClusterEvent.MemberStatusChange
{
public MemberWeaklyUp(Akka.Cluster.Member member) { }
}
public abstract class ReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IReachabilityEvent
{
protected ReachabilityEvent(Akka.Cluster.Member member) { }
Expand Down Expand Up @@ -159,6 +163,7 @@ namespace Akka.Cluster
public sealed class ClusterSettings
{
public ClusterSettings(Akka.Configuration.Config config, string systemName) { }
public bool AllowWeaklyUpMembers { get; }
public System.Nullable<System.TimeSpan> AutoDownUnreachableAfter { get; }
public System.Type DowningProviderType { get; }
[System.ObsoleteAttribute("Use Cluster.DowningProvider.DownRemovalMargin [1.1.2]")]
Expand Down Expand Up @@ -224,6 +229,7 @@ namespace Akka.Cluster
Exiting = 3,
Down = 4,
Removed = 5,
WeaklyUp = 6,
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/ConvergenceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public ConvergenceSpecConfig(bool failureDetectorPuppet)
CommonConfig = ConfigurationFactory.ParseString(@"akka.cluster.publish-stats-interval = 25s")
.WithFallback(MultiNodeLoggingConfig.LoggingConfig)
.WithFallback(DebugConfig(true))
.WithFallback(@"akka.cluster.failure-detector.threshold = 4")
.WithFallback(@"
akka.cluster.failure-detector.threshold = 4
akka.cluster.allow-weakly-up-members = off")
.WithFallback(MultiNodeClusterSpec.ClusterConfig(failureDetectorPuppet));
}
}
Expand Down
161 changes: 161 additions & 0 deletions src/core/Akka.Cluster.Tests.MultiNode/MemberWeaklyUpSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//-----------------------------------------------------------------------
// <copyright file="MemberWeaklyUpSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Remote.Transport;
using Akka.TestKit;

namespace Akka.Cluster.Tests.MultiNode
{
public class MemberWeaklyUpConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }
public RoleName Fourth { get; }
public RoleName Fifth { get; }

public MemberWeaklyUpConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");
Fourth = Role("fourth");
Fifth = Role("fifth");

CommonConfig = DebugConfig(on: false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.remote.retry-gate-closed-for = 3s
akka.cluster.allow-weakly-up-members = on"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
}
}

public class MemberWeaklyUpSpec : MultiNodeClusterSpec
{
private readonly MemberWeaklyUpConfig _config;
private readonly ImmutableArray<RoleName> _side1;
private readonly ImmutableArray<RoleName> _side2;

public MemberWeaklyUpSpec() : this(new MemberWeaklyUpConfig())
{
}

private MemberWeaklyUpSpec(MemberWeaklyUpConfig config) : base(config, typeof(MemberWeaklyUpSpec))
{
_config = config;
_side1 = ImmutableArray.CreateRange(new[] { config.First, config.Second });
_side2 = ImmutableArray.CreateRange(new[] { config.Third, config.Fourth, config.Fifth });
MuteMarkingAsUnreachable();
}

[MultiNodeFact]
public void Spec()
{
A_cluster_of_3_members_should_reach_initial_convergence();
A_cluster_of_3_members_should_detect_network_partition_and_mark_nodes_on_the_other_side_as_unreachable();
A_cluster_of_3_members_should_accept_joining_on_each_side_and_set_status_to_WeaklyUp();
A_cluster_of_3_members_should_change_status_to_Up_after_healed_network_partition();
}

public void A_cluster_of_3_members_should_reach_initial_convergence()
{
AwaitClusterUp(_config.First, _config.Third, _config.Fourth);
EnterBarrier("after-1");
}

public void A_cluster_of_3_members_should_detect_network_partition_and_mark_nodes_on_the_other_side_as_unreachable()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() =>
{
// split the cluster in two parts (first, second) / (third, fourth, fifth)
foreach (var role1 in _side1)
foreach (var role2 in _side2)
TestConductor.Blackhole(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
}, _config.First);

EnterBarrier("after-split");

RunOn(() =>
{
AwaitAssert(() =>
ClusterView.UnreachableMembers
.Select(m => m.Address).ToImmutableHashSet()
.ShouldBe(ImmutableHashSet.CreateRange(new[] { GetAddress(_config.Third), GetAddress(_config.Fourth) })));
}, _config.First);

RunOn(() =>
{
AwaitAssert(() =>
ClusterView.UnreachableMembers
.Select(m => m.Address).ToImmutableHashSet()
.ShouldBe(ImmutableHashSet.CreateRange(new[] { GetAddress(_config.First) })));
}, _config.Third, _config.Fourth);

EnterBarrier("after-2");
});
}

public void A_cluster_of_3_members_should_accept_joining_on_each_side_and_set_status_to_WeaklyUp()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() => Cluster.Get(Sys).Join(GetAddress(_config.First)), _config.Second);
RunOn(() => Cluster.Get(Sys).Join(GetAddress(_config.Fourth)), _config.Fifth);

EnterBarrier("joined");

RunOn(() => AwaitAssert(() =>
{
ClusterView.Members.Count.ShouldBe(4);
ClusterView.Members.Any(m => m.Address == GetAddress(_config.Second) && m.Status == MemberStatus.WeaklyUp).ShouldBe(true);

}), _side1.ToArray());

RunOn(() => AwaitAssert(() =>
{
ClusterView.Members.Count.ShouldBe(4);
ClusterView.Members.Any(m => m.Address == GetAddress(_config.Fifth) && m.Status == MemberStatus.WeaklyUp).ShouldBe(true);

}), _side2.ToArray());

EnterBarrier("after-3");
});
}

public void A_cluster_of_3_members_should_change_status_to_Up_after_healed_network_partition()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() =>
{
foreach (var role1 in _side1)
foreach (var role2 in _side2)
{
TestConductor.PassThrough(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
}
}, _config.First);

EnterBarrier("after-passThrough");

AwaitAllReachable();
AwaitMembersUp(5);

EnterBarrier("after-4");
});
}
}
}
51 changes: 51 additions & 0 deletions src/core/Akka.Cluster.Tests.MultiNode/MinMembersBeforeUpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

namespace Akka.Cluster.Tests.MultiNode
{
#region Member.Up

public class MinMembersBeforeUpSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
Expand Down Expand Up @@ -83,6 +85,55 @@ public void Cluster_leader_must_wait_with_moving_members_to_up_until_minimum_num
}
}

#endregion

#region Member.WeaklyUp

public class MinMembersBeforeUpWithWeaklyUpSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
public readonly RoleName Second;
public readonly RoleName Third;

public MinMembersBeforeUpWithWeaklyUpSpecConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");

CommonConfig = ConfigurationFactory.ParseString(@"
akka.cluster.min-nr-of-members = 3
akka.cluster.allow-weakly-up-members = on
").WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
}
}
public class MinMembersBeforeUpWithWeaklyUpNode1 : MinMembersBeforeUpWithWeaklyUpSpec { }
public class MinMembersBeforeUpWithWeaklyUpNode2 : MinMembersBeforeUpWithWeaklyUpSpec { }
public class MinMembersBeforeUpWithWeaklyUpNode3 : MinMembersBeforeUpWithWeaklyUpSpec { }

public abstract class MinMembersBeforeUpWithWeaklyUpSpec : MinMembersBeforeUpBase
{
protected MinMembersBeforeUpWithWeaklyUpSpec() : this(new MinMembersBeforeUpWithWeaklyUpSpecConfig())
{
}

protected MinMembersBeforeUpWithWeaklyUpSpec(MinMembersBeforeUpWithWeaklyUpSpecConfig config)
: base(config, typeof(MinMembersBeforeUpWithWeaklyUpSpec))
{
First = config.First;
Second = config.Second;
Third = config.Third;
}

[MultiNodeFact]
public void Cluster_leader_must_wait_with_moving_members_to_up_until_minimum_number_of_members_have_joined_with_WeaklyUp_enabled()
{
TestWaitMovingMembersToUp();
}
}

#endregion

public class MinMembersOfRoleBeforeUpSpec : MinMembersBeforeUpBase
{
public MinMembersOfRoleBeforeUpSpec() : this(new MinMembersOfRoleBeforeUpSpecConfig())
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public RestartNode3SpecConfig()
Third = Role("third");

CommonConfig = DebugConfig(false)
.WithFallback(ConfigurationFactory.ParseString("akka.cluster.auto-down-unreachable-after = off"))
.WithFallback(ConfigurationFactory.ParseString(@"
akka.cluster.auto-down-unreachable-after = off
akka.cluster.allow-weakly-up-members = off"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
Expand Down
41 changes: 39 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2120,6 +2120,10 @@ public void LeaderActions()
else
{
_leaderActionCounter += 1;

if (_cluster.Settings.AllowWeaklyUpMembers && _leaderActionCounter >= 3)
MoveJoiningToWeaklyUp();

if (_leaderActionCounter == firstNotice || _leaderActionCounter % periodicNotice == 0)
{
_log.Info(
Expand All @@ -2138,6 +2142,39 @@ public void LeaderActions()
ShutdownSelfWhenDown();
}

private void MoveJoiningToWeaklyUp()
{
var localGossip = _latestGossip;
var localMembers = localGossip.Members;
var enoughMembers = IsMinNrOfMembersFulfilled();

bool IsJoiningToWeaklyUp(Member m) => m.Status == MemberStatus.Joining
&& enoughMembers
&& _latestGossip.ReachabilityExcludingDownedObservers.Value.IsReachable(m.UniqueAddress);

var changedMembers = localMembers
.Where(IsJoiningToWeaklyUp)
.Select(m => m.Copy(MemberStatus.WeaklyUp))
.ToImmutableSortedSet();

if (!changedMembers.IsEmpty)
{
// replace changed members
var newMembers = changedMembers.Union(localMembers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Son of a bitch... I totally missed this when I originally reviewed the PR. It's going to great the same problems here that happened in #2584. This is a bug. I'll PR it per #3274

var newGossip = localGossip.Copy(members: newMembers);
UpdateLatestGossip(newGossip);

// log status change
foreach (var m in changedMembers)
{
_log.Info("Leader is moving node [{0}] to [{1}]", m.Address, m.Status);
}

Publish(newGossip);
if (_cluster.Settings.PublishStatsInterval == TimeSpan.Zero) PublishInternalStats();
}
}

private void ShutdownSelfWhenDown()
{
if (_latestGossip.GetMember(SelfUniqueAddress).Status == MemberStatus.Down)
Expand Down Expand Up @@ -2197,7 +2234,7 @@ public void LeaderActionsOnConvergence()
var localSeen = localOverview.Seen;

bool enoughMembers = IsMinNrOfMembersFulfilled();
Func<Member, bool> isJoiningUp = m => m.Status == MemberStatus.Joining && enoughMembers;
bool IsJoiningUp(Member m) => (m.Status == MemberStatus.Joining || m.Status == MemberStatus.WeaklyUp) && enoughMembers;

var removedUnreachable =
localOverview.Reachability.AllUnreachableOrTerminated.Select(localGossip.GetMember)
Expand All @@ -2211,7 +2248,7 @@ public void LeaderActionsOnConvergence()
var upNumber = 0;
var changedMembers = localMembers.Select(m =>
{
if (isJoiningUp(m))
if (IsJoiningUp(m))
{
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
Expand Down
Loading