Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster singleton should consider Member AppVersion during hand over. #6065

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSingletonRestart2Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class ClusterSingletonRestart3Spec : AkkaSpec
{
private readonly ActorSystem _sys1;
private readonly ActorSystem _sys2;
private readonly ActorSystem _sys3;

public ClusterSingletonRestart3Spec(ITestOutputHelper output) : base(@"
akka.loglevel = DEBUG
akka.actor.provider = ""cluster""
akka.cluster.app-version = ""1.0.0""
akka.cluster.auto-down-unreachable-after = 2s
akka.cluster.singleton.min-number-of-hand-over-retries = 5
akka.cluster.singleton.consider-app-version = true
akka.remote {
dot-netty.tcp {
hostname = ""127.0.0.1""
port = 0
}
}", output)
{
_sys1 = Sys;
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
InitializeLogger(_sys2);
_sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString("akka.cluster.app-version = \"1.0.2\"")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.WithFallback(Sys.Settings.Config));
InitializeLogger(_sys3);
}

public void Join(ActorSystem from, ActorSystem to)
{
from.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()),
PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(from)), "echo");


Within(TimeSpan.FromSeconds(45), () =>
{
AwaitAssert(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Cluster.Get(from)
.State.Members.Select(x => x.Status)
.ToImmutableHashSet()
.Should()
.Equal(ImmutableHashSet<MemberStatus>.Empty.Add(MemberStatus.Up));
});
});
}

[Fact]
public void Singleton_should_consider_AppVersion_when_handing_over()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);

var proxy2 = _sys2.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys2)), "proxy2");

Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("poke", probe.Ref);
var singleton = probe.ExpectMsg<Member>(TimeSpan.FromSeconds(1));
singleton.Should().Be(Cluster.Get(_sys1).SelfMember);
singleton.AppVersion.Version.Should().Be("1.0.0");
});
});

// A new node with higher AppVersion joins the cluster
Join(_sys3, _sys1);

// Old node with the singleton instance left the cluster
Cluster.Get(_sys1).Leave(Cluster.Get(_sys1).SelfAddress);

// let it stabilize
Task.Delay(TimeSpan.FromSeconds(5)).Wait();

Within(TimeSpan.FromSeconds(10), () =>
{
AwaitAssert(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("poke", probe.Ref);

// note that _sys3 has a higher app-version, so the singleton should start there
var singleton = probe.ExpectMsg<Member>(TimeSpan.FromSeconds(1));
singleton.Should().Be(Cluster.Get(_sys3).SelfMember);
singleton.AppVersion.Version.Should().Be("1.0.2");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

});
});
}

protected override async Task AfterAllAsync()
{
await base.AfterAllAsync();
await ShutdownAsync(_sys2);
await ShutdownAsync(_sys3);
}

public class Singleton : ReceiveActor
{
public Singleton()
{
ReceiveAny(o =>
{
Sender.Tell(Cluster.Get(Context.System).SelfMember);
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// -----------------------------------------------------------------------
// <copyright file="MemberAgeOrderingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Util;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class MemberAgeOrderingSpec
{
[Fact(DisplayName = "MemberAgeOrdering should sort based on UpNumber")]
public void SortByUpNumberTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9),
};

var seq = members.ToList();
seq.Count.Should().Be(3);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 9));
}

[Fact(DisplayName = "MemberAgeOrdering should sort based on Address if UpNumber is the same")]
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
public void SortByAddressTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 1),
};

var seq = members.ToList();
seq.Count.Should().Be(3);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 1));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 1));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1));
}

[Fact(DisplayName = "MemberAgeOrdering should prefer AppVersion over UpNumber")]
public void SortByAppVersionTest()
{
var members = new SortedSet<Member>(MemberAgeOrdering.DescendingWithAppVersion)
{
Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 2, appVersion: AppVersion.Create("1.0.0")),
Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 7, appVersion: AppVersion.Create("1.0.2")),
Create(Address.Parse("akka://sys@darkstar:1115"), upNumber: 8, appVersion: AppVersion.Create("1.0.2")),
Create(Address.Parse("akka://sys@darkstar:1116"), upNumber: 6, appVersion: AppVersion.Create("1.0.2")),
};

var seq = members.ToList();
seq.Count.Should().Be(6);
seq[0].Should().Be(Create(Address.Parse("akka://sys@darkstar:1116"), upNumber: 6, appVersion: AppVersion.Create("1.0.2")));
seq[1].Should().Be(Create(Address.Parse("akka://sys@darkstar:1114"), upNumber: 7, appVersion: AppVersion.Create("1.0.2")));
seq[2].Should().Be(Create(Address.Parse("akka://sys@darkstar:1115"), upNumber: 8, appVersion: AppVersion.Create("1.0.2")));
seq[3].Should().Be(Create(Address.Parse("akka://sys@darkstar:1113"), upNumber: 1, appVersion: AppVersion.Create("1.0.0")));
seq[4].Should().Be(Create(Address.Parse("akka://sys@darkstar:1111"), upNumber: 2, appVersion: AppVersion.Create("1.0.0")));
seq[5].Should().Be(Create(Address.Parse("akka://sys@darkstar:1112"), upNumber: 3, appVersion: AppVersion.Create("1.0.0")));
}

public static Member Create(
Address address,
MemberStatus status = MemberStatus.Up,
ImmutableHashSet<string> roles = null,
int uid = 0,
int upNumber = 0,
AppVersion appVersion = null)
{
return Member.Create(new UniqueAddress(address, uid), upNumber, status, roles ?? ImmutableHashSet<string>.Empty, appVersion ?? AppVersion.Zero);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,9 @@ private void InitializeFSM()
{
case StartOldestChangedBuffer _:
{
_oldestChangedBuffer = Context.ActorOf(Actor.Props.Create<OldestChangedBuffer>(_settings.Role).WithDispatcher(Context.Props.Dispatcher));
_oldestChangedBuffer = Context.ActorOf(
Actor.Props.Create(() => new OldestChangedBuffer(_settings.Role, _settings.ConsiderAppVersion))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.WithDispatcher(Context.Props.Dispatcher));
GetNextOldestChanged();
return Stay();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static ClusterSingletonManagerSettings Create(Config config)
role: RoleOption(config.GetString("role")),
removalMargin: TimeSpan.Zero, // defaults to ClusterSettings.DownRemovalMargin
handOverRetryInterval: config.GetTimeSpan("hand-over-retry-interval"),
leaseSettings: lease);
leaseSettings: lease,
considerAppVersion: config.GetBoolean("consider-app-version"));
}

private static string RoleOption(string role)
Expand Down Expand Up @@ -91,6 +92,14 @@ private static string RoleOption(string role)
/// LeaseSettings for acquiring before creating the singleton actor
/// </summary>
public LeaseUsageSettings LeaseSettings { get; }


/// <summary>
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </summary>
public bool ConsiderAppVersion { get; }

/// <summary>
/// Creates a new instance of the <see cref="ClusterSingletonManagerSettings"/>.
Expand All @@ -114,9 +123,19 @@ private static string RoleOption(string role)
/// over has started or the previous oldest member is removed from the cluster
/// (+ <paramref name="removalMargin"/>).
/// </param>
/// <param name="considerAppVersion">
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </param>
/// <exception cref="ArgumentException">TBD</exception>
public ClusterSingletonManagerSettings(string singletonName, string role, TimeSpan removalMargin, TimeSpan handOverRetryInterval)
: this(singletonName, role, removalMargin, handOverRetryInterval, null)
public ClusterSingletonManagerSettings(
string singletonName,
string role,
TimeSpan removalMargin,
TimeSpan handOverRetryInterval,
bool considerAppVersion)
: this(singletonName, role, removalMargin, handOverRetryInterval, null, considerAppVersion)
{
}

Expand All @@ -143,8 +162,19 @@ public ClusterSingletonManagerSettings(string singletonName, string role, TimeSp
/// (+ <paramref name="removalMargin"/>).
/// </param>
/// <param name="leaseSettings">LeaseSettings for acquiring before creating the singleton actor</param>
/// <param name="considerAppVersion">
/// Should <see cref="Member.AppVersion"/> be considered when the cluster singleton instance is being moved to another node.
/// When set to false, singleton instance will always be created on oldest member.
/// When set to true, singleton instance will be created on the oldest member with the highest <see cref="Member.AppVersion"/> number.
/// </param>
/// <exception cref="ArgumentException">TBD</exception>
public ClusterSingletonManagerSettings(string singletonName, string role, TimeSpan removalMargin, TimeSpan handOverRetryInterval, LeaseUsageSettings leaseSettings)
public ClusterSingletonManagerSettings(
string singletonName,
string role,
TimeSpan removalMargin,
TimeSpan handOverRetryInterval,
LeaseUsageSettings leaseSettings,
bool considerAppVersion)
{
if (string.IsNullOrWhiteSpace(singletonName))
throw new ArgumentNullException(nameof(singletonName));
Expand All @@ -158,6 +188,7 @@ public ClusterSingletonManagerSettings(string singletonName, string role, TimeSp
RemovalMargin = removalMargin;
HandOverRetryInterval = handOverRetryInterval;
LeaseSettings = leaseSettings;
ConsiderAppVersion = considerAppVersion;
}

/// <summary>
Expand Down Expand Up @@ -210,15 +241,21 @@ public ClusterSingletonManagerSettings WithLeaseSettings(LeaseUsageSettings leas
return Copy(leaseSettings: leaseSettings);
}

private ClusterSingletonManagerSettings Copy(string singletonName = null, Option<string> role = default, TimeSpan? removalMargin = null,
TimeSpan? handOverRetryInterval = null, Option<LeaseUsageSettings> leaseSettings = default)
private ClusterSingletonManagerSettings Copy(
string singletonName = null,
Option<string> role = default,
TimeSpan? removalMargin = null,
TimeSpan? handOverRetryInterval = null,
Option<LeaseUsageSettings> leaseSettings = default,
bool? considerAppVersion = null)
{
return new ClusterSingletonManagerSettings(
singletonName: singletonName ?? SingletonName,
role: role.HasValue ? role.Value : Role,
removalMargin: removalMargin ?? RemovalMargin,
handOverRetryInterval: handOverRetryInterval ?? HandOverRetryInterval,
leaseSettings: leaseSettings.HasValue ? leaseSettings.Value : LeaseSettings
leaseSettings: leaseSettings.HasValue ? leaseSettings.Value : LeaseSettings,
considerAppVersion: considerAppVersion ?? ConsiderAppVersion
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
.WithDeploy(Deploy.Local);
}

private readonly MemberAgeOrdering _memberAgeComparer;
private readonly ClusterSingletonProxySettings _settings;
private readonly Cluster _cluster = Cluster.Get(Context.System);
private readonly Queue<KeyValuePair<object, IActorRef>> _buffer = new Queue<KeyValuePair<object, IActorRef>>(); // queue seems to fit better
Expand All @@ -84,7 +85,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
private string _identityId;
private IActorRef _singleton = null;
private ICancelable _identityTimer = null;
private ImmutableSortedSet<Member> _membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(MemberAgeOrdering.Descending);
private ImmutableSortedSet<Member> _membersByAge;
private ILoggingAdapter _log;

/// <summary>
Expand All @@ -98,6 +99,11 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
_singletonPath = (singletonManagerPath + "/" + settings.SingletonName).Split('/');
_identityId = CreateIdentifyId(_identityCounter);

_memberAgeComparer = settings.ConsiderAppVersion
? MemberAgeOrdering.DescendingWithAppVersion
: MemberAgeOrdering.Descending;
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);

Receive<ClusterEvent.CurrentClusterState>(s => HandleInitial(s));
Receive<ClusterEvent.MemberUp>(m => Add(m.Member));
Receive<ClusterEvent.MemberExited>(m => Remove(m.Member));
Expand Down Expand Up @@ -197,7 +203,7 @@ private void HandleInitial(ClusterEvent.CurrentClusterState state)
TrackChanges(() =>
_membersByAge = state.Members
.Where(m => m.Status == MemberStatus.Up && MatchingRole(m))
.ToImmutableSortedSet(MemberAgeOrdering.Descending));
.ToImmutableSortedSet(_memberAgeComparer));
}

// Discard old singleton ActorRef and send a periodic message to self to identify the singleton.
Expand Down
Loading