From 68e327faefed0504b8c85749a7c1aadfcef53ed2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 1 Apr 2021 17:04:04 -0500 Subject: [PATCH 1/7] implementation of Akka.Cluster.Tests.MultiNode.StressSpec --- .../Akka.Cluster.Tests.MultiNode.csproj | 1 + .../Bugfix4353Specs.cs | 2 +- .../StressSpec.cs | 1249 +++++++++++++++++ 3 files changed, 1251 insertions(+), 1 deletion(-) create mode 100644 src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs diff --git a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj index ffc6127b3c5..e1488e9f55b 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj +++ b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj @@ -4,6 +4,7 @@ Akka.Cluster.Tests.MultiNode $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) + latest diff --git a/src/core/Akka.Cluster.Tests.MultiNode/Bugfix4353Specs.cs b/src/core/Akka.Cluster.Tests.MultiNode/Bugfix4353Specs.cs index 63b41428a2e..347fd141ac1 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/Bugfix4353Specs.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/Bugfix4353Specs.cs @@ -36,7 +36,7 @@ protected Bugfix4353Spec(Bugfix4353SpecsConfig config) : base(config, typeof(Bug } [MultiNodeFact] - public void Bugfix4353Spec_Cluster_of_3_must_reach_cnovergence() + public void Bugfix4353Spec_Cluster_of_3_must_reach_convergence() { AwaitClusterUp(First, Second, Third); EnterBarrier("after-1"); diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs new file mode 100644 index 00000000000..0b271778319 --- /dev/null +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -0,0 +1,1249 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Diagnostics; +using System.Linq; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Configuration; +using Akka.Event; +using Akka.Remote; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using Akka.TestKit.Internal; +using Akka.TestKit.Internal.StringMatcher; +using Akka.TestKit.TestEvent; +using Akka.Util; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; +using Environment = System.Environment; + +namespace Akka.Cluster.Tests.MultiNode +{ + public class StressSpecConfig : MultiNodeConfig + { + public int TotalNumberOfNodes => Environment.GetEnvironmentVariable("MultiNode.Akka.Cluster.Stress.NrOfNodes") switch + { + string e when string.IsNullOrEmpty(e) => 13, + string val => int.Parse(val), + _ => 13 + }; + + public StressSpecConfig() + { + foreach (var i in Enumerable.Range(1, TotalNumberOfNodes)) + Role("node-" + i); + + CommonConfig = ConfigurationFactory.ParseString(@" + akka.test.cluster-stress-spec { + infolog = off + # scale the nr-of-nodes* settings with this factor + nr-of-nodes-factor = 1 + # not scaled + nr-of-seed-nodes = 3 + nr-of-nodes-joining-to-seed-initially = 2 + nr-of-nodes-joining-one-by-one-small = 2 + nr-of-nodes-joining-one-by-one-large = 2 + nr-of-nodes-joining-to-one = 2 + nr-of-nodes-leaving-one-by-one-small = 1 + nr-of-nodes-leaving-one-by-one-large = 1 + nr-of-nodes-leaving = 2 + nr-of-nodes-shutdown-one-by-one-small = 1 + nr-of-nodes-shutdown-one-by-one-large = 1 + nr-of-nodes-partition = 2 + nr-of-nodes-shutdown = 2 + nr-of-nodes-join-remove = 2 + # not scaled + # scale the *-duration settings with this factor + duration-factor = 1 + join-remove-duration = 90s + idle-gossip-duration = 10s + expected-test-duration = 600s + # scale convergence within timeouts with this factor + convergence-within-factor = 1.0 + } + akka.actor.provider = cluster + akka.cluster { + failure-detector.acceptable-heartbeat-pause = 3s + downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster"" + split-brain-resolver { + active-strategy = keep-majority #TODO: remove this once it's been made default + stable-after = 10s + } + publish-stats-interval = 1s + } + akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""] + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + akka.actor.default-dispatcher.fork-join-executor { + parallelism - min = 8 + parallelism - max = 8 + } + "); + + TestTransport = true; + } + + public class Settings + { + private readonly Config _testConfig; + + public Settings(Config config, int totalNumberOfNodes) + { + TotalNumberOfNodes = totalNumberOfNodes; + _testConfig = config.GetConfig("akka.test.cluster-stress-spec"); + Infolog = _testConfig.GetBoolean("infolog"); + NFactor = _testConfig.GetInt("nr-of-nodes-factor"); + NumberOfSeedNodes = _testConfig.GetInt("nr-of-seed-nodes"); + NumberOfNodesJoiningToSeedNodesInitially = + _testConfig.GetInt("nr-of-nodes-joining-to-seed-initially") * NFactor; + NumberOfNodesJoiningOneByOneSmall = _testConfig.GetInt("nr-of-nodes-joining-one-by-one-small") * NFactor; + NumberOfNodesJoiningOneByOneLarge = _testConfig.GetInt("nr-of-nodes-joining-one-by-one-large") * NFactor; + NumberOfNodesJoiningToOneNode = _testConfig.GetInt("nr-of-nodes-joining-to-one") * NFactor; + // remaining will join to seed nodes + NumberOfNodesJoiningToSeedNodes = (totalNumberOfNodes - NumberOfSeedNodes - + NumberOfNodesJoiningToSeedNodesInitially - + NumberOfNodesJoiningOneByOneSmall - + NumberOfNodesJoiningOneByOneLarge - NumberOfNodesJoiningToOneNode); + if (NumberOfNodesJoiningToSeedNodes < 0) + throw new ArgumentOutOfRangeException("nr-of-nodes-joining-*", + $"too many configured nr-of-nodes-joining-*, total should be <= {totalNumberOfNodes}"); + + NumberOfNodesLeavingOneByOneSmall = _testConfig.GetInt("nr-of-nodes-leaving-one-by-one-small") * NFactor; + NumberOfNodesLeavingOneByOneLarge = _testConfig.GetInt("nr-of-nodes-leaving-one-by-one-large") * NFactor; + NumberOfNodesLeaving = _testConfig.GetInt("nr-of-nodes-leaving") * NFactor; + NumberOfNodesShutdownOneByOneSmall = _testConfig.GetInt("nr-of-nodes-shutdown-one-by-one-small") * NFactor; + NumberOfNodesShutdownOneByOneLarge = _testConfig.GetInt("nr-of-nodes-shutdown-one-by-one-large") * NFactor; + NumberOfNodesShutdown = _testConfig.GetInt("nr-of-nodes-shutdown") * NFactor; + NumberOfNodesPartition = _testConfig.GetInt("nr-of-nodes-partition") * NFactor; + NumberOfNodesJoinRemove = _testConfig.GetInt("nr-of-nodes-join-remove"); // not scaled by nodes factor + + DFactor = _testConfig.GetInt("duration-factor"); + JoinRemoveDuration = TimeSpan.FromMilliseconds(_testConfig.GetTimeSpan("join-remove-duration").TotalMilliseconds * DFactor); + IdleGossipDuration = TimeSpan.FromMilliseconds(_testConfig.GetTimeSpan("idle-gossip-duration").TotalMilliseconds * DFactor); + ExpectedTestDuration = TimeSpan.FromMilliseconds(_testConfig.GetTimeSpan("expected-test-duration").TotalMilliseconds * DFactor); + ConvergenceWithinFactor = _testConfig.GetDouble("convergence-within-factor"); + + if (NumberOfSeedNodes + NumberOfNodesJoiningToSeedNodesInitially + NumberOfNodesJoiningOneByOneSmall + + NumberOfNodesJoiningOneByOneLarge + NumberOfNodesJoiningToOneNode + + NumberOfNodesJoiningToSeedNodes > totalNumberOfNodes) + { + throw new ArgumentOutOfRangeException("nr-of-nodes-joining-*", + $"specified number of joining nodes <= {totalNumberOfNodes}"); + } + + // don't shutdown the 3 nodes hosting the master actors + if (NumberOfNodesLeavingOneByOneSmall + NumberOfNodesLeavingOneByOneLarge + NumberOfNodesLeaving + + NumberOfNodesShutdownOneByOneSmall + NumberOfNodesShutdownOneByOneLarge + NumberOfNodesShutdown > + totalNumberOfNodes - 3) + { + throw new ArgumentOutOfRangeException("nr-of-nodes-leaving-*", + $"specified number of leaving/shutdown nodes <= {totalNumberOfNodes - 3}"); + } + + if (NumberOfNodesJoinRemove > totalNumberOfNodes) + { + throw new ArgumentOutOfRangeException("nr-of-nodes-join-remove*", + $"nr-of-nodes-join-remove should be <= {totalNumberOfNodes}"); + } + } + + public int TotalNumberOfNodes { get; } + + public bool Infolog { get; } + public int NFactor { get; } + + public int NumberOfSeedNodes { get; } + + public int NumberOfNodesJoiningToSeedNodesInitially { get; } + + public int NumberOfNodesJoiningOneByOneSmall { get; } + + public int NumberOfNodesJoiningOneByOneLarge { get; } + + public int NumberOfNodesJoiningToOneNode { get; } + + public int NumberOfNodesJoiningToSeedNodes { get; } + + public int NumberOfNodesLeavingOneByOneSmall { get; } + + public int NumberOfNodesLeavingOneByOneLarge { get; } + + public int NumberOfNodesLeaving { get; } + + public int NumberOfNodesShutdownOneByOneSmall { get; } + + public int NumberOfNodesShutdownOneByOneLarge { get; } + + public int NumberOfNodesShutdown { get; } + + public int NumberOfNodesPartition { get; } + + public int NumberOfNodesJoinRemove { get; } + + public int DFactor { get; } + + public TimeSpan JoinRemoveDuration { get; } + + public TimeSpan IdleGossipDuration { get; } + + public TimeSpan ExpectedTestDuration { get; } + + public double ConvergenceWithinFactor { get; } + + public override string ToString() + { + return _testConfig.WithFallback($"nrOfNodes={TotalNumberOfNodes}").Root.ToString(2); + } + } + } + + internal sealed class ClusterResult + { + public ClusterResult(Address address, TimeSpan duration, GossipStats clusterStats) + { + Address = address; + Duration = duration; + ClusterStats = clusterStats; + } + + public Address Address { get; } + public TimeSpan Duration { get; } + public GossipStats ClusterStats { get; } + } + + internal sealed class AggregatedClusterResult + { + public AggregatedClusterResult(string title, TimeSpan duration, GossipStats clusterStats) + { + Title = title; + Duration = duration; + ClusterStats = clusterStats; + } + + public string Title { get; } + + public TimeSpan Duration { get; } + + public GossipStats ClusterStats { get; } + } + + /// + /// Central aggregator of cluster statistics and metrics. + /// + /// Reports the result via log periodically and when all + /// expected results has been collected. It shuts down + /// itself when expected results has been collected. + /// + internal class ClusterResultAggregator : ReceiveActor + { + private readonly string _title; + private readonly int _expectedResults; + private readonly StressSpecConfig.Settings _settings; + + private readonly ILoggingAdapter _log = Context.GetLogger(); + + private Option _reportTo = Option.None; + private ImmutableList _results = ImmutableList.Empty; + private ImmutableSortedDictionary> _phiValuesObservedByNode = + ImmutableSortedDictionary>.Empty.WithComparers(Member.AddressOrdering); + private ImmutableSortedDictionary _clusterStatsObservedByNode = + ImmutableSortedDictionary.Empty.WithComparers(Member.AddressOrdering); + + public static readonly string FormatPhiHeader = "[Monitor]\t[Subject]\t[count]\t[count phi > 1.0]\t[max phi]"; + + public string FormatPhiLine(Address monitor, Address subject, PhiValue phi) + { + return $"{monitor}\t{subject}\t{phi.Count}\t{phi.CountAboveOne}\t{phi.Max:F2}"; + } + + public string FormatPhi() + { + if (_phiValuesObservedByNode.IsEmpty) return string.Empty; + else + { + //var lines = _phiValuesObservedByNode.Select( + // x => x.Value.SelectMany(y => FormatPhiLine(x.Key, y.Address, y))); + var lines = (from mon in _phiValuesObservedByNode from phi in mon.Value select FormatPhiLine(mon.Key, phi.Address, phi)); + return FormatPhiHeader + Environment.NewLine + string.Join(Environment.NewLine, lines); + } + } + + public TimeSpan MaxDuration => _results.Max(x => x.Duration); + + public GossipStats TotalGossipStats => + _results.Aggregate(new GossipStats(), (stats, result) => stats += result.ClusterStats); + + public string FormatStats() + { + string F(ClusterEvent.CurrentInternalStats stats) + { + return + $"CurrentClusterStats({stats.GossipStats.ReceivedGossipCount}, {stats.GossipStats.MergeCount}, " + + $"{stats.GossipStats.SameCount}, {stats.GossipStats.NewerCount}, {stats.GossipStats.OlderCount}," + + $"{stats.SeenBy.VersionSize}, {stats.SeenBy.SeenLatest})"; + } + + return string.Join(Environment.NewLine, "ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)" + + Environment.NewLine + + _clusterStatsObservedByNode.Select(x => $"{x.Key}\t{F(x.Value)}")); + } + + public ClusterResultAggregator(string title, int expectedResults, StressSpecConfig.Settings settings) + { + _title = title; + _expectedResults = expectedResults; + _settings = settings; + + Receive(phi => + { + _phiValuesObservedByNode = _phiValuesObservedByNode.SetItem(phi.Address, phi.PhiValues); + }); + + Receive(stats => + { + _clusterStatsObservedByNode = _clusterStatsObservedByNode.SetItem(stats.Address, stats.Stats); + }); + + Receive(_ => + { + if (_settings.Infolog) + { + _log.Info("[{0}] in progress" + Environment.NewLine + "{1}" + Environment.NewLine + "{2}", _title, + FormatPhi(), FormatStats()); + } + }); + + Receive(r => + { + _results = _results.Add(r); + if (_results.Count == _expectedResults) + { + var aggregated = new AggregatedClusterResult(_title, MaxDuration, TotalGossipStats); + if (_settings.Infolog) + { + _log.Info("[{0}] completed in [{1}] ms" + Environment.NewLine + "{2}" + + Environment.NewLine + "{3}" + Environment.NewLine + "{4}", _title, aggregated.Duration.TotalMilliseconds, + aggregated.ClusterStats, FormatPhi(), FormatStats()); + } + _reportTo.OnSuccess(r => r.Tell(aggregated)); + Context.Stop(Self); + } + }); + + Receive(_ => { }); + Receive(re => + { + _reportTo = re.Ref; + }); + } + } + + /// + /// Keeps cluster statistics and metrics reported by . + /// + /// Logs the list of historical results when a new is received. + /// + internal class ClusterResultHistory : ReceiveActor + { + private ILoggingAdapter _log = Context.GetLogger(); + private ImmutableList _history = ImmutableList.Empty; + + public ClusterResultHistory() + { + Receive(result => + { + _history = _history.Add(result); + }); + } + + public static readonly string FormatHistoryHeader = "[Title]\t[Duration (ms)]\t[GossipStats(gossip, merge, same, newer, older)]"; + + public string FormatHistoryLine(AggregatedClusterResult result) + { + return $"{result.Title}\t{result.Duration.TotalMilliseconds}\t{result.ClusterStats}"; + } + + public string FormatHistory => FormatHistoryHeader + Environment.NewLine + + string.Join(Environment.NewLine, _history.Select(x => FormatHistoryLine(x))); + } + + /// + /// Collect phi values of the failure detector and report to the central + /// + internal class PhiObserver : ReceiveActor + { + private readonly Cluster _cluster = Cluster.Get(Context.System); + private readonly ILoggingAdapter _log = Context.GetLogger(); + private ImmutableDictionary _phiByNode = ImmutableDictionary.Empty; + + private Option _reportTo = Option.None; + private HashSet
_nodes = new HashSet
(); + + private ICancelable _checkPhiTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(1), Context.Self, PhiTick.Instance, ActorRefs.NoSender); + + private double Phi(Address address) + { + return _cluster.FailureDetector switch + { + DefaultFailureDetectorRegistry
reg => (reg.GetFailureDetector(address)) switch + { + PhiAccrualFailureDetector fd => fd.CurrentPhi, + _ => 0.0d + }, + _ => 0.0d + }; + } + + private PhiValue PhiByNodeDefault(Address address) + { + if (!_phiByNode.ContainsKey(address)) + { + // populate default value + _phiByNode = _phiByNode.Add(address, new PhiValue(address, 0, 0, 0.0d)); + } + + return _phiByNode[address]; + } + + public PhiObserver() + { + Receive(_ => + { + foreach (var node in _nodes) + { + var previous = PhiByNodeDefault(node); + var p = Phi(node); + if (p > 0 || _cluster.FailureDetector.IsMonitoring(node)) + { + var aboveOne = !double.IsInfinity(p) && p > 1.0d ? 1 : 0; + _phiByNode = _phiByNode.SetItem(node, new PhiValue(node, + previous.CountAboveOne + aboveOne, + previous.Count + 1, + Math.Max(previous.Max, p))); + } + } + + var phiSet = _phiByNode.Values.ToImmutableSortedSet(); + _reportTo.OnSuccess(r => r.Tell(new PhiResult(_cluster.SelfAddress, phiSet))); + }); + + Receive(state => + { + _nodes = new HashSet
(state.Members.Select(x => x.Address)); + }); + + Receive(m => + { + _nodes.Add(m.Member.Address); + }); + + Receive(r => + { + _reportTo.OnSuccess(o => Context.Unwatch(o)); + _reportTo = r.Ref; + _reportTo.OnSuccess(n => Context.Watch(n)); + }); + + Receive(t => + { + if (_reportTo.HasValue) + _reportTo = Option.None; + }); + + Receive(_ => + { + _phiByNode = ImmutableDictionary.Empty; + _nodes.Clear(); + _cluster.Unsubscribe(Self); + _cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); + }); + } + + protected override void PreStart() + { + _cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); + } + + protected override void PostStop() + { + _cluster.Unsubscribe(Self); + _checkPhiTask.Cancel(); + base.PostStop(); + } + + public ITimerScheduler Timers { get; set; } + } + + internal readonly struct PhiValue : IComparable + { + public PhiValue(Address address, int countAboveOne, int count, double max) + { + Address = address; + CountAboveOne = countAboveOne; + Count = count; + Max = max; + } + + public Address Address { get; } + public int CountAboveOne { get; } + public int Count { get; } + public double Max { get; } + + public int CompareTo(PhiValue other) + { + return Member.AddressOrdering.Compare(Address, other.Address); + } + } + + internal readonly struct PhiResult + { + public PhiResult(Address address, ImmutableSortedSet phiValues) + { + Address = address; + PhiValues = phiValues; + } + + public Address Address { get; } + + public ImmutableSortedSet PhiValues { get; } + } + + internal class StatsObserver : ReceiveActor + { + private readonly Cluster _cluster = Cluster.Get(Context.System); + private Option _reportTo = Option.None; + private Option _startStats = Option.None; + + protected override void PreStart() + { + _cluster.Subscribe(Self, typeof(ClusterEvent.CurrentInternalStats)); + } + + protected override void PostStop() + { + _cluster.Unsubscribe(Self); + } + + public StatsObserver() + { + Receive(stats => + { + var gossipStats = stats.GossipStats; + var vclockStats = stats.SeenBy; + + GossipStats MatchStats() + { + if (!_startStats.HasValue) + { + _startStats = gossipStats; + return gossipStats; + } + + return _startStats.Value - gossipStats; + } + + var diff = MatchStats(); + var res = new StatsResult(_cluster.SelfAddress, new ClusterEvent.CurrentInternalStats(diff, vclockStats)); + _reportTo.OnSuccess(a => a.Tell(res)); + }); + + Receive(r => + { + _reportTo.OnSuccess(o => Context.Unwatch(o)); + _reportTo = r.Ref; + _reportTo.OnSuccess(n => Context.Watch(n)); + }); + + Receive(t => + { + if (_reportTo.HasValue) + _reportTo = Option.None; + }); + + Receive(_ => + { + _startStats = Option.None; + }); + + // nothing interesting here + Receive(_ => { }); + } + } + + /// + /// Used for remote death watch testing + /// + internal class Watchee : ActorBase + { + protected override bool Receive(object message) + { + return true; + } + } + + internal sealed class Begin + { + public static readonly Begin Instance = new Begin(); + private Begin() { } + } + + internal sealed class End + { + public static readonly End Instance = new End(); + private End() { } + } + + internal sealed class RetryTick + { + public static readonly RetryTick Instance = new RetryTick(); + private RetryTick() { } + } + + internal sealed class ReportTick + { + public static readonly ReportTick Instance = new ReportTick(); + private ReportTick() { } + } + + internal sealed class PhiTick + { + public static readonly PhiTick Instance = new PhiTick(); + private PhiTick() { } + } + + internal sealed class ReportTo + { + public ReportTo(Option @ref) + { + Ref = @ref; + } + + public Option Ref { get; } + } + + internal sealed class StatsResult + { + public StatsResult(Address address, ClusterEvent.CurrentInternalStats stats) + { + Address = address; + Stats = stats; + } + + public Address Address { get; } + + public Akka.Cluster.ClusterEvent.CurrentInternalStats Stats { get; } + } + + internal sealed class Reset + { + public static readonly Reset Instance = new Reset(); + private Reset() { } + } + + internal class MeasureDurationUntilDown : ReceiveActor + { + private readonly Cluster _cluster = Cluster.Get(Context.System); + private readonly long _startTime; + private readonly ILoggingAdapter _log = Context.GetLogger(); + public MeasureDurationUntilDown() + { + _startTime = MonotonicClock.GetTicks(); + + Receive(d => + { + var m = d.Member; + if (m.UniqueAddress == _cluster.SelfUniqueAddress) + { + _log.Info("Downed [{0}] after [{1} ms]", _cluster.SelfAddress, TimeSpan.FromTicks(MonotonicClock.GetTicks() - _startTime).TotalMilliseconds); + } + }); + + Receive(_ => { }); + } + + protected override void PreStart() + { + _cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, typeof(ClusterEvent.MemberDowned)); + } + } + + public class StressSpec : MultiNodeClusterSpec + { + public StressSpecConfig.Settings Settings { get; } + public TestProbe IdentifyProbe; + + protected override TimeSpan ShutdownTimeout => Dilated(TimeSpan.FromSeconds(30)); + + public int Step = 0; + public int NbrUsedRoles = 0; + + protected virtual void MuteLog(ActorSystem sys) + { + Sys.EventStream.Publish(new Mute(new ErrorFilter(typeof(ApplicationException), new ContainsString("Simulated exception")))); + MuteDeadLetters(sys, typeof(AggregatedClusterResult), typeof(StatsResult), typeof(PhiResult), typeof(RetryTick)); + } + + public StressSpec() : this(new StressSpecConfig()){ } + + protected StressSpec(StressSpecConfig config) : base(config, typeof(StressSpec)) + { + Settings = new StressSpecConfig.Settings(Sys.Settings.Config, config.TotalNumberOfNodes); + ClusterResultHistory = new Lazy(() => + { + if (Settings.Infolog) + return Sys.ActorOf(Props.Create(() => new ClusterResultHistory()), "resultHistory"); + return Sys.DeadLetters; + }); + + PhiObserver = new Lazy(() => + { + return Sys.ActorOf(Props.Create(() => new PhiObserver()), "phiObserver"); + }); + + StatsObserver = new Lazy(() => + { + return Sys.ActorOf(Props.Create(() => new StatsObserver()), "statsObserver"); + }); + } + + protected override void AtStartup() + { + IdentifyProbe = CreateTestProbe(); + base.AtStartup(); + } + + public string ClrInfo() + { + var sb = new StringBuilder(); + sb.Append("Operating System: ") + .Append(Environment.OSVersion.Platform) + .Append(", ") + .AppendLine(RuntimeInformation.ProcessArchitecture.ToString()) + .Append(", ") + .Append(Environment.OSVersion.VersionString) + .AppendLine(); + + sb.Append("CLR: ") + .Append(RuntimeInformation.FrameworkDescription) + .AppendLine(); + + sb.Append("Processors: ").Append(Environment.ProcessorCount) + .AppendLine() + .Append("Load average: ").Append("can't be easily measured on .NET Core") // TODO: fix + .AppendLine() + .Append("Thread count: ") + .Append(Process.GetCurrentProcess().Threads.Count) + .AppendLine(); + + sb.Append("Memory: ") + .Append(Process.GetCurrentProcess().VirtualMemorySize64 / 1024 / 1024) + .Append("MB [allocated virtual memory]") + .AppendLine() + .Append(" (") + .Append(Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024) + .Append(" - ") + .Append(Process.GetCurrentProcess().PeakWorkingSet64 / 1024 / 1024) + .Append(") MB [working set / peak working set]"); + + sb.AppendLine("Args: ").Append(string.Join(Environment.NewLine, Environment.GetCommandLineArgs())) + .AppendLine(); + + return sb.ToString(); + } + + public ImmutableList SeedNodes => Roles.Take(Settings.NumberOfSeedNodes).ToImmutableList(); + + internal GossipStats LatestGossipStats => Cluster.ReadView.LatestStats.GossipStats; + + public Lazy ClusterResultHistory { get; } + + public Lazy PhiObserver { get; } + + public Lazy StatsObserver { get; } + + public Option ClusterResultAggregator() + { + Sys.ActorSelection(new RootActorPath(GetAddress(Roles.First())) / "user" / ("result" + Step)) + .Tell(new Identify(Step), IdentifyProbe.Ref); + return new Option(IdentifyProbe.ExpectMsg().Subject); + } + + public void CreateResultAggregator(string title, int expectedResults, bool includeInHistory) + { + RunOn(() => + { + var aggregator = Sys.ActorOf( + Props.Create(() => new ClusterResultAggregator(title, expectedResults, Settings)) + .WithDeploy(Deploy.Local), "result" + Step); + + if (includeInHistory && Settings.Infolog) + { + aggregator.Tell(new ReportTo(new Option(ClusterResultHistory.Value))); + } + else + { + aggregator.Tell(new ReportTo(Option.None)); + } + }, + Roles.First()); + EnterBarrier("result-aggregator-created-" + Step); + + RunOn(() => + { + var resultAggregator = ClusterResultAggregator(); + PhiObserver.Value.Tell(new ReportTo(resultAggregator)); + StatsObserver.Value.Tell(Reset.Instance); + StatsObserver.Value.Tell(new ReportTo(resultAggregator)); + }, Roles.Take(NbrUsedRoles).ToArray()); + + } + + public void AwaitClusterResult() + { + RunOn(() => + { + ClusterResultAggregator().OnSuccess(r => + { + Watch(r); + ExpectMsg(t => t.ActorRef.Path == r.Path); + }); + }, Roles.First()); + EnterBarrier("cluster-result-done-" + Step); + } + + public void JoinOneByOne(int numberOfNodes) + { + foreach (var i in Enumerable.Range(0, numberOfNodes)) + { + JoinOne(); + NbrUsedRoles += 1; + Step += 1; + } + } + + public TimeSpan ConvergenceWithin(TimeSpan baseDuration, int nodes) + { + return TimeSpan.FromMilliseconds(baseDuration.TotalMilliseconds * Settings.ConvergenceWithinFactor * nodes); + } + + public void JoinOne() + { + Within(TimeSpan.FromSeconds(5) + ConvergenceWithin(TimeSpan.FromSeconds(2), NbrUsedRoles + 1), () => + { + var currentRoles = Roles.Take(NbrUsedRoles + 1).ToArray(); + var title = $"join one to {NbrUsedRoles} nodes cluster"; + CreateResultAggregator(title, expectedResults: currentRoles.Length, includeInHistory: true); + RunOn(() => + { + ReportResult(() => + { + RunOn(() => + { + Cluster.Join(GetAddress(Roles.First())); + }, currentRoles.Last()); + AwaitMembersUp(currentRoles.Length, timeout: RemainingOrDefault); + return true; + }); + }, currentRoles); + AwaitClusterResult(); + EnterBarrier("join-one-" + Step); + }); + } + + public void JoinSeveral(int numberOfNodes, bool toSeedNodes) + { + string FormatSeedJoin() + { + return toSeedNodes ? "seed nodes" : "one node"; + } + + Within(TimeSpan.FromSeconds(10) + ConvergenceWithin(TimeSpan.FromSeconds(3), NbrUsedRoles + numberOfNodes), + () => + { + var currentRoles = Roles.Take(NbrUsedRoles + numberOfNodes).ToArray(); + var joiningRoles = currentRoles.Skip(NbrUsedRoles).ToArray(); + var title = $"join {numberOfNodes} to {FormatSeedJoin()}, in {NbrUsedRoles} nodes cluster"; + CreateResultAggregator(title, expectedResults: currentRoles.Length, true); + RunOn(() => + { + ReportResult(() => + { + RunOn(() => + { + if (toSeedNodes) + { + Cluster.JoinSeedNodes(SeedNodes.Select(x => GetAddress(x))); + } + else + { + Cluster.Join(GetAddress(Roles.First())); + } + }, joiningRoles); + AwaitMembersUp(currentRoles.Length, timeout: RemainingOrDefault); + return true; + }); + }, currentRoles); + AwaitClusterResult(); + EnterBarrier("join-several-" + Step); + }); + } + + public void RemoveOneByOne(int numberOfNodes, bool shutdown) + { + foreach (var i in Enumerable.Range(0, numberOfNodes)) + { + RemoveOne(shutdown); + NbrUsedRoles -= 1; + Step += 1; + } + } + + public void RemoveOne(bool shutdown) + { + string FormatNodeLeave() + { + return shutdown ? "shutdown" : "remove"; + } + + Within(TimeSpan.FromSeconds(25) + ConvergenceWithin(TimeSpan.FromSeconds(3), NbrUsedRoles - 1), () + => + { + var currentRoles = Roles.Take(NbrUsedRoles - 1).ToArray(); + var title = $"{FormatNodeLeave()} one from {NbrUsedRoles} nodes cluster"; + CreateResultAggregator(title, expectedResults:currentRoles.Length, true); + var removeRole = Roles[NbrUsedRoles - 1]; + var removeAddress = GetAddress(removeRole); + + RunOn(() => + { + var watchee = Sys.ActorOf(Props.Create(() => new Watchee()), "watchee"); + if(!shutdown) + Cluster.Leave(GetAddress(Myself)); + }, removeRole); + + EnterBarrier("watchee-created-" + Step); + + RunOn(() => + { + Sys.ActorSelection(Node(removeRole) / "user" / "watchee").Tell(new Identify("watchee"), IdentifyProbe.Ref); + var watchee = IdentifyProbe.ExpectMsg().Subject; + Watch(watchee); + }, Roles.First()); + EnterBarrier("watchee-established-" + Step); + + RunOn(() => + { + ReportResult(() => + { + RunOn(() => + { + if (shutdown) + { + if (Settings.Infolog) + { + Log.Info("Shutting down [{0}]", removeAddress); + } + + TestConductor.Exit(removeRole, 0).Wait(RemainingOrDefault); + } + }, Roles.First()); + + AwaitMembersUp(currentRoles.Length, timeout: RemainingOrDefault); + AwaitAllReachable(); + return true; + }); + }, currentRoles); + + RunOn(() => + { + var expectedPath = new RootActorPath(removeAddress) / "user" / "watchee"; + ExpectMsg(t => t.ActorRef.Path == expectedPath); + }, Roles.First()); + + EnterBarrier("watch-verified-" + Step); + + AwaitClusterResult(); + EnterBarrier("remove-one-" + Step); + }); + } + + public void RemoveSeveral(int numberOfNodes, bool shutdown) + { + string FormatNodeLeave() + { + return shutdown ? "shutdown" : "remove"; + } + + Within(TimeSpan.FromSeconds(25) + ConvergenceWithin(TimeSpan.FromSeconds(5), NbrUsedRoles - numberOfNodes), + () => + { + var currentRoles = Roles.Take(NbrUsedRoles - numberOfNodes).ToArray(); + var removeRoles = Roles.Skip(currentRoles.Length).Take(numberOfNodes).ToArray(); + var title = $"{FormatNodeLeave()} {numberOfNodes} in {NbrUsedRoles} nodes cluster"; + CreateResultAggregator(title, expectedResults: currentRoles.Length, includeInHistory: true); + + RunOn(() => + { + if (!shutdown) + { + Cluster.Leave(GetAddress(Myself)); + } + }, removeRoles); + + RunOn(() => + { + ReportResult(() => + { + RunOn(() => + { + if (shutdown) + { + foreach (var role in removeRoles) + { + if (Settings.Infolog) + Log.Info("Shutting down [{0}]", GetAddress(role)); + TestConductor.Exit(role, 0).Wait(RemainingOrDefault); + } + } + }, Roles.First()); + AwaitMembersUp(currentRoles.Length, timeout: RemainingOrDefault); + AwaitAllReachable(); + return true; + }); + }, currentRoles); + + AwaitClusterResult(); + EnterBarrier("remove-several-" + Step); + }); + } + + public void PartitionSeveral(int numberOfNodes) + { + Within(TimeSpan.FromSeconds(25) + ConvergenceWithin(TimeSpan.FromSeconds(5), NbrUsedRoles - numberOfNodes), + () => + { + var currentRoles = Roles.Take(NbrUsedRoles - numberOfNodes).ToArray(); + var removeRoles = Roles.Skip(currentRoles.Length).Take(numberOfNodes).ToArray(); + var title = $"partition {numberOfNodes} in {NbrUsedRoles} nodes cluster"; + CreateResultAggregator(title, expectedResults: currentRoles.Length, includeInHistory: true); + + RunOn(() => + { + foreach (var x in currentRoles) + { + foreach (var y in removeRoles) + { + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both); + } + } + }, Roles.First()); + EnterBarrier("partition-several-blackhole"); + + RunOn(() => + { + ReportResult(() => + { + var startTime = MonotonicClock.GetTicks(); + AwaitMembersUp(currentRoles.Length, timeout:RemainingOrDefault); + Sys.Log.Info("Removed [{0}] members after [{0} ms]", + removeRoles.Length, TimeSpan.FromTicks(MonotonicClock.GetTicks() - startTime).TotalMilliseconds); + AwaitAllReachable(); + return true; + }); + }, currentRoles); + + RunOn(() => + { + Sys.ActorOf(Props.Create()); + AwaitAssert(() => + { + Cluster.IsTerminated.Should().BeTrue(); + }); + }, removeRoles); + AwaitClusterResult(); + EnterBarrier("partition-several-" + Step); + }); + } + + public T ReportResult(Func thunk) + { + var startTime = MonotonicClock.GetTicks(); + var startStats = ClusterView.LatestStats.GossipStats; + + var returnValue = thunk(); + + ClusterResultAggregator().OnSuccess(r => + { + r.Tell(new ClusterResult(Cluster.SelfAddress, TimeSpan.FromTicks(MonotonicClock.GetTicks() - startTime), LatestGossipStats - startStats)); + }); + + return returnValue; + } + + public void ExerciseJoinRemove(string title, TimeSpan duration) + { + var activeRoles = Roles.Take(Settings.NumberOfNodesJoinRemove).ToArray(); + var loopDuration = TimeSpan.FromSeconds(10) + + ConvergenceWithin(TimeSpan.FromSeconds(4), NbrUsedRoles + activeRoles.Length); + var rounds = (int)Math.Max(1.0d, (duration - loopDuration).TotalMilliseconds / loopDuration.TotalMilliseconds); + var usedRoles = Roles.Take(NbrUsedRoles).ToArray(); + var usedAddresses = usedRoles.Select(x => GetAddress(x)).ToImmutableHashSet(); + + Option Loop(int counter, Option previousAs, + ImmutableHashSet
allPreviousAddresses) + { + if (counter > rounds) return previousAs; + + var t = title + " round " + counter; + RunOn(() => + { + PhiObserver.Value.Tell(Reset.Instance); + StatsObserver.Value.Tell(Reset.Instance); + }, usedRoles); + CreateResultAggregator(t, expectedResults:NbrUsedRoles, includeInHistory:true); + + var nextAs = Option.None; + var nextAddresses = ImmutableHashSet
.Empty; + Within(loopDuration, () => + { + var (nextAsy, nextAddr) = ReportResult(() => + { + Option nextAs; + + if (activeRoles.Contains(Myself)) + { + previousAs.OnSuccess(s => + { + Shutdown(s); + }); + + var sys = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + MuteLog(sys); + Akka.Cluster.Cluster.Get(sys).JoinSeedNodes(SeedNodes.Select(x => GetAddress(x))); + nextAs = new Option(sys); + } + else + { + nextAs = previousAs; + } + + RunOn(() => + { + AwaitMembersUp(NbrUsedRoles + activeRoles.Length, + canNotBePartOfMemberRing: allPreviousAddresses, + timeout: RemainingOrDefault); + AwaitAllReachable(); + }, usedRoles); + + nextAddresses = ClusterView.Members.Select(x => x.Address).ToImmutableHashSet() + .Except(usedAddresses); + + RunOn(() => + { + nextAddresses.Count.Should().Be(Settings.NumberOfNodesJoinRemove); + }, usedRoles); + + return (nextAs, nextAddresses); + }); + + nextAs = nextAsy; + nextAddresses = nextAddr; + }); + + AwaitClusterResult(); + Step += 1; + return Loop(counter + 1, nextAs, nextAddresses); + } + + Loop(1, Option.None, ImmutableHashSet
.Empty).OnSuccess(aSys => + { + Shutdown(aSys); + }); + + Within(loopDuration, () => + { + RunOn(() => + { + AwaitMembersUp(NbrUsedRoles, timeout: RemainingOrDefault); + AwaitAllReachable(); + PhiObserver.Value.Tell(Reset.Instance); + StatsObserver.Value.Tell(Reset.Instance); + }, usedRoles); + }); + EnterBarrier("join-remove-shutdown-" + Step); + } + + public void IdleGossip(string title) + { + CreateResultAggregator(title, expectedResults: NbrUsedRoles, includeInHistory: true); + ReportResult(() => + { + ClusterView.Members.Count.Should().Be(NbrUsedRoles); + Thread.Sleep(Settings.IdleGossipDuration); + ClusterView.Members.Count.Should().Be(NbrUsedRoles); + return true; + }); + AwaitClusterResult(); + } + + public void IncrementStep() + { + Step += 1; + } + + [MultiNodeFact] + public void Cluster_under_stress() + { + MustLogSettings(); + IncrementStep(); + MustJoinSeedNodes(); + IncrementStep(); + } + + public void MustLogSettings() + { + if (Settings.Infolog) + { + Log.Info("StressSpec CLR:" + Environment.NewLine + ClrInfo()); + RunOn(() => + { + Log.Info("StressSpec settings:" + Environment.NewLine + Settings); + }); + } + EnterBarrier("after-" + Step); + } + + public void MustJoinSeedNodes() + { + Within(TimeSpan.FromSeconds(30), () => + { + var otherNodesJoiningSeedNodes = Roles.Skip(Settings.NumberOfSeedNodes) + .Take(Settings.NumberOfNodesJoiningToSeedNodesInitially).ToArray(); + var size = SeedNodes.Count + otherNodesJoiningSeedNodes.Length; + + CreateResultAggregator("join seed nodes", expectedResults: size, includeInHistory: true); + + RunOn(() => + { + ReportResult(() => + { + Cluster.JoinSeedNodes(SeedNodes.Select(x => GetAddress(x))); + AwaitMembersUp(size, timeout: RemainingOrDefault); + return true; + }); + }, SeedNodes.AddRange(otherNodesJoiningSeedNodes).ToArray()); + + AwaitClusterResult(); + NbrUsedRoles += size; + EnterBarrier("after-" + Step); + }); + } + } +} From 71be0a2b470671097dd814ab6f4160e837b9c503 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 5 Apr 2021 21:19:18 -0500 Subject: [PATCH 2/7] made MuteLog overrideable in Akka.Cluster.Testkit --- src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs | 2 +- src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs index e9a6f8e931c..ec7fd773968 100644 --- a/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs +++ b/src/core/Akka.Cluster.TestKit/MultiNodeClusterSpec.cs @@ -175,7 +175,7 @@ protected override void AfterTermination() //TODO: ExpectedTestDuration? - void MuteLog(ActorSystem sys = null) + public virtual void MuteLog(ActorSystem sys = null) { if (sys == null) sys = Sys; if (!sys.Log.IsDebugEnabled) diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index 0b271778319..d1307e7dbf1 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -684,8 +684,10 @@ public class StressSpec : MultiNodeClusterSpec public int Step = 0; public int NbrUsedRoles = 0; - protected virtual void MuteLog(ActorSystem sys) + public override void MuteLog(ActorSystem sys = null) { + sys ??= Sys; + base.MuteLog(sys); Sys.EventStream.Publish(new Mute(new ErrorFilter(typeof(ApplicationException), new ContainsString("Simulated exception")))); MuteDeadLetters(sys, typeof(AggregatedClusterResult), typeof(StatsResult), typeof(PhiResult), typeof(RetryTick)); } From 576da6601c3b0271b89bf78d3d264a6857175853 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 Apr 2021 10:58:38 -0500 Subject: [PATCH 3/7] if Roles is empty, then don't run the thunk on any nodes Changed this to make it consistent with the JVM --- .../StressSpec.cs | 89 ++++++++++++++++--- src/core/Akka.Remote.TestKit/MultiNodeSpec.cs | 2 - 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index d1307e7dbf1..1edd32e340a 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -40,7 +40,7 @@ public StressSpecConfig() CommonConfig = ConfigurationFactory.ParseString(@" akka.test.cluster-stress-spec { - infolog = off + infolog = on # scale the nr-of-nodes* settings with this factor nr-of-nodes-factor = 1 # not scaled @@ -283,14 +283,14 @@ public string FormatStats() string F(ClusterEvent.CurrentInternalStats stats) { return - $"CurrentClusterStats({stats.GossipStats.ReceivedGossipCount}, {stats.GossipStats.MergeCount}, " + - $"{stats.GossipStats.SameCount}, {stats.GossipStats.NewerCount}, {stats.GossipStats.OlderCount}," + - $"{stats.SeenBy.VersionSize}, {stats.SeenBy.SeenLatest})"; + $"CurrentClusterStats({stats.GossipStats?.ReceivedGossipCount}, {stats.GossipStats?.MergeCount}, " + + $"{stats.GossipStats?.SameCount}, {stats.GossipStats?.NewerCount}, {stats.GossipStats?.OlderCount}," + + $"{stats.SeenBy?.VersionSize}, {stats.SeenBy?.SeenLatest})"; } return string.Join(Environment.NewLine, "ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)" + Environment.NewLine + - _clusterStatsObservedByNode.Select(x => $"{x.Key}\t{F(x.Value)}")); + string.Join(Environment.NewLine, _clusterStatsObservedByNode.Select(x => $"{x.Key}\t{F(x.Value)}"))); } public ClusterResultAggregator(string title, int expectedResults, StressSpecConfig.Settings settings) @@ -546,7 +546,7 @@ GossipStats MatchStats() return gossipStats; } - return _startStats.Value - gossipStats; + return gossipStats -_startStats.Value; } var diff = MatchStats(); @@ -727,7 +727,7 @@ public string ClrInfo() sb.Append("Operating System: ") .Append(Environment.OSVersion.Platform) .Append(", ") - .AppendLine(RuntimeInformation.ProcessArchitecture.ToString()) + .Append(RuntimeInformation.ProcessArchitecture.ToString()) .Append(", ") .Append(Environment.OSVersion.VersionString) .AppendLine(); @@ -745,9 +745,6 @@ public string ClrInfo() .AppendLine(); sb.Append("Memory: ") - .Append(Process.GetCurrentProcess().VirtualMemorySize64 / 1024 / 1024) - .Append("MB [allocated virtual memory]") - .AppendLine() .Append(" (") .Append(Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024) .Append(" - ") @@ -1034,6 +1031,8 @@ public void PartitionSeveral(int numberOfNodes) var currentRoles = Roles.Take(NbrUsedRoles - numberOfNodes).ToArray(); var removeRoles = Roles.Skip(currentRoles.Length).Take(numberOfNodes).ToArray(); var title = $"partition {numberOfNodes} in {NbrUsedRoles} nodes cluster"; + Console.WriteLine(title); + Console.WriteLine("[{0}] are blackholing [{1}]", string.Join(",", currentRoles.Select(x => x.ToString())), string.Join(",", removeRoles.Select(x => x.ToString()))); CreateResultAggregator(title, expectedResults: currentRoles.Length, includeInHistory: true); RunOn(() => @@ -1207,6 +1206,22 @@ public void Cluster_under_stress() IncrementStep(); MustJoinSeedNodes(); IncrementStep(); + MustJoinSeedNodesOneByOneToSmallCluster(); + IncrementStep(); + MustJoinSeveralNodesToOneNode(); + IncrementStep(); + MustJoinSeveralNodesToSeedNodes(); + IncrementStep(); + MustJoinNodesOneByOneToLargeCluster(); + IncrementStep(); + MustExerciseJoinRemoveJoinRemove(); + IncrementStep(); + MustGossipWhenIdle(); + IncrementStep(); + MustDownPartitionedNodes(); + IncrementStep(); + MustLeaveNodesOneByOneFromLargeCluster(); + IncrementStep(); } public void MustLogSettings() @@ -1247,5 +1262,59 @@ public void MustJoinSeedNodes() EnterBarrier("after-" + Step); }); } + + public void MustJoinSeedNodesOneByOneToSmallCluster() + { + JoinOneByOne(Settings.NumberOfNodesJoiningOneByOneSmall); + EnterBarrier("after-" + Step); + } + + public void MustJoinSeveralNodesToOneNode() + { + JoinSeveral(Settings.NumberOfNodesJoiningToOneNode, false); + NbrUsedRoles += Settings.NumberOfNodesJoiningToOneNode; + EnterBarrier("after-" + Step); + } + + public void MustJoinSeveralNodesToSeedNodes() + { + if (Settings.NumberOfNodesJoiningToSeedNodes > 0) + { + JoinSeveral(Settings.NumberOfNodesJoiningToSeedNodes, true); + NbrUsedRoles += Settings.NumberOfNodesJoiningToSeedNodes; + } + EnterBarrier("after-" + Step); + } + + public void MustJoinNodesOneByOneToLargeCluster() + { + JoinOneByOne(Settings.NumberOfNodesJoiningOneByOneLarge); + EnterBarrier("after-" + Step); + } + + public void MustExerciseJoinRemoveJoinRemove() + { + ExerciseJoinRemove("exercise join/remove", Settings.JoinRemoveDuration); + EnterBarrier("after-" + Step); + } + + public void MustGossipWhenIdle() + { + IdleGossip("idle gossip"); + EnterBarrier("after-" + Step); + } + + public void MustDownPartitionedNodes() + { + PartitionSeveral(Settings.NumberOfNodesPartition); + NbrUsedRoles += Settings.NumberOfNodesPartition; + EnterBarrier("after-" + Step); + } + + public void MustLeaveNodesOneByOneFromLargeCluster() + { + RemoveOneByOne(Settings.NumberOfNodesLeavingOneByOneLarge, shutdown:false); + EnterBarrier("after-" + Step); + } } } diff --git a/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs b/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs index 7314b6bd259..d6ef4b7cca5 100644 --- a/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs +++ b/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs @@ -508,7 +508,6 @@ public int InitialParticipants /// public void RunOn(Action thunk, params RoleName[] nodes) { - if (nodes.Length == 0) throw new ArgumentException("No node given to run on."); if (IsNode(nodes)) thunk(); } @@ -518,7 +517,6 @@ public void RunOn(Action thunk, params RoleName[] nodes) /// public async Task RunOnAsync(Func thunkAsync, params RoleName[] nodes) { - if (nodes.Length == 0) throw new ArgumentException("No node given to run on."); if (IsNode(nodes)) await thunkAsync(); } From b24e2fe441f20e02079ab67ca31c341c2d1a73f5 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 Apr 2021 16:49:03 -0500 Subject: [PATCH 4/7] made it possible to actually enable Cluster.AssertInvariants via environment variable --- src/core/Akka.Cluster/Cluster.cs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 5bef4d02955..16b5ffcb921 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -62,13 +62,29 @@ public static Cluster Get(ActorSystem system) return system.WithExtension(); } + static Cluster() + { + bool GetAssertInvariants() + { + var isOn = Environment.GetEnvironmentVariable("akka.cluster.assert")?.ToLowerInvariant(); + switch (isOn) + { + case "on": + return true; + default: + return false; + } + } + + IsAssertInvariantsEnabled = GetAssertInvariants(); + } + /// /// TBD /// internal static bool IsAssertInvariantsEnabled { - //TODO: Consequences of this? - get { return false; } + get; } /// From c71441eb056f4caa94038f5f926c76316890c6be Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 Apr 2021 16:51:56 -0500 Subject: [PATCH 5/7] added assert invariants to build script cleaned up gossip class to assert more invariants --- build.fsx | 2 + .../StressSpec.cs | 96 ++++++++++++++++--- src/core/Akka.Cluster/Gossip.cs | 38 +++++--- 3 files changed, 107 insertions(+), 29 deletions(-) diff --git a/build.fsx b/build.fsx index aaaa423fbc4..e93cfd36e33 100644 --- a/build.fsx +++ b/build.fsx @@ -348,6 +348,7 @@ Target "MultiNodeTests" (fun _ -> Target "MultiNodeTestsNetCore" (fun _ -> if not skipBuild.Value then + setEnvironVar "akka.cluster.assert" "on" // needed to enable assert invariants for Akka.Cluster let multiNodeTestPath = findToolInSubPath "Akka.MultiNodeTestRunner.dll" (currentDirectory @@ "src" @@ "core" @@ "Akka.MultiNodeTestRunner" @@ "bin" @@ "Release" @@ testNetCoreVersion @@ "win10-x64" @@ "publish") let projects = @@ -388,6 +389,7 @@ Target "MultiNodeTestsNetCore" (fun _ -> ) Target "MultiNodeTestsNet" (fun _ -> if not skipBuild.Value then + setEnvironVar "akka.cluster.assert" "on" // needed to enable assert invariants for Akka.Cluster let multiNodeTestPath = findToolInSubPath "Akka.MultiNodeTestRunner.dll" (currentDirectory @@ "src" @@ "core" @@ "Akka.MultiNodeTestRunner" @@ "bin" @@ "Release" @@ testNetVersion @@ "win10-x64" @@ "publish") let projects = diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index 1edd32e340a..cf8825415c1 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -1,4 +1,11 @@ -using System; +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; @@ -68,8 +75,8 @@ public StressSpecConfig() } akka.actor.provider = cluster akka.cluster { - failure-detector.acceptable-heartbeat-pause = 3s - downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster"" + failure-detector.acceptable-heartbeat-pause = 3s + downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster"" split-brain-resolver { active-strategy = keep-majority #TODO: remove this once it's been made default stable-after = 10s @@ -266,8 +273,7 @@ public string FormatPhi() if (_phiValuesObservedByNode.IsEmpty) return string.Empty; else { - //var lines = _phiValuesObservedByNode.Select( - // x => x.Value.SelectMany(y => FormatPhiLine(x.Key, y.Address, y))); + var lines = (from mon in _phiValuesObservedByNode from phi in mon.Value select FormatPhiLine(mon.Key, phi.Address, phi)); return FormatPhiHeader + Environment.NewLine + string.Join(Environment.NewLine, lines); } @@ -917,26 +923,36 @@ string FormatNodeLeave() var currentRoles = Roles.Take(NbrUsedRoles - 1).ToArray(); var title = $"{FormatNodeLeave()} one from {NbrUsedRoles} nodes cluster"; CreateResultAggregator(title, expectedResults:currentRoles.Length, true); + var removeRole = Roles[NbrUsedRoles - 1]; var removeAddress = GetAddress(removeRole); - + Console.WriteLine($"Preparing to {FormatNodeLeave()}[{removeAddress}] role [{removeRole.Name}] out of [{Roles.Count}]"); RunOn(() => { var watchee = Sys.ActorOf(Props.Create(() => new Watchee()), "watchee"); - if(!shutdown) - Cluster.Leave(GetAddress(Myself)); + Console.WriteLine("Created watchee [{0}]", watchee); }, removeRole); EnterBarrier("watchee-created-" + Step); RunOn(() => { - Sys.ActorSelection(Node(removeRole) / "user" / "watchee").Tell(new Identify("watchee"), IdentifyProbe.Ref); - var watchee = IdentifyProbe.ExpectMsg().Subject; - Watch(watchee); + AwaitAssert(() => + { + Sys.ActorSelection(new RootActorPath(removeAddress) / "user" / "watchee").Tell(new Identify("watchee"), IdentifyProbe.Ref); + var watchee = IdentifyProbe.ExpectMsg(TimeSpan.FromSeconds(1)).Subject; + Watch(watchee); + }, interval:TimeSpan.FromSeconds(1.25d)); + }, Roles.First()); EnterBarrier("watchee-established-" + Step); + RunOn(() => + { + if (!shutdown) + Cluster.Leave(GetAddress(Myself)); + }, removeRole); + RunOn(() => { ReportResult(() => @@ -950,7 +966,7 @@ string FormatNodeLeave() Log.Info("Shutting down [{0}]", removeAddress); } - TestConductor.Exit(removeRole, 0).Wait(RemainingOrDefault); + TestConductor.Exit(removeRole, 0).Wait(); } }, Roles.First()); @@ -1041,7 +1057,7 @@ public void PartitionSeveral(int numberOfNodes) { foreach (var y in removeRoles) { - TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both); + TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait(); } } }, Roles.First()); @@ -1222,6 +1238,17 @@ public void Cluster_under_stress() IncrementStep(); MustLeaveNodesOneByOneFromLargeCluster(); IncrementStep(); + MustShutdownNodesOneByOneFromLargeCluster(); + IncrementStep(); + MustLeaveSeveralNodes(); + IncrementStep(); + MustShutdownSeveralNodes(); + IncrementStep(); + MustShutdownNodesOneByOneFromSmallCluster(); + IncrementStep(); + MustLeaveNodesOneByOneFromSmallCluster(); + IncrementStep(); + MustLogClrInfo(); } public void MustLogSettings() @@ -1307,7 +1334,7 @@ public void MustGossipWhenIdle() public void MustDownPartitionedNodes() { PartitionSeveral(Settings.NumberOfNodesPartition); - NbrUsedRoles += Settings.NumberOfNodesPartition; + NbrUsedRoles -= Settings.NumberOfNodesPartition; EnterBarrier("after-" + Step); } @@ -1316,5 +1343,46 @@ public void MustLeaveNodesOneByOneFromLargeCluster() RemoveOneByOne(Settings.NumberOfNodesLeavingOneByOneLarge, shutdown:false); EnterBarrier("after-" + Step); } + + public void MustShutdownNodesOneByOneFromLargeCluster() + { + RemoveOneByOne(Settings.NumberOfNodesShutdownOneByOneLarge, shutdown: true); + EnterBarrier("after-" + Step); + } + + public void MustLeaveSeveralNodes() + { + RemoveSeveral(Settings.NumberOfNodesLeaving, shutdown: false); + NbrUsedRoles -= Settings.NumberOfNodesLeaving; + EnterBarrier("after-" + Step); + } + + public void MustShutdownSeveralNodes() + { + RemoveSeveral(Settings.NumberOfNodesShutdown, shutdown: true); + NbrUsedRoles -= Settings.NumberOfNodesShutdown; + EnterBarrier("after-" + Step); + } + + public void MustShutdownNodesOneByOneFromSmallCluster() + { + RemoveOneByOne(Settings.NumberOfNodesShutdownOneByOneSmall, true); + EnterBarrier("after-" + Step); + } + + public void MustLeaveNodesOneByOneFromSmallCluster() + { + RemoveOneByOne(Settings.NumberOfNodesLeavingOneByOneSmall, false); + EnterBarrier("after-" + Step); + } + + public void MustLogClrInfo() + { + if (Settings.Infolog) + { + Log.Info("StressSpec CLR: " + Environment.NewLine + "{0}", ClrInfo()); + } + EnterBarrier("after-" + Step); + } } } diff --git a/src/core/Akka.Cluster/Gossip.cs b/src/core/Akka.Cluster/Gossip.cs index fb5bd82be01..bf875df4157 100644 --- a/src/core/Akka.Cluster/Gossip.cs +++ b/src/core/Akka.Cluster/Gossip.cs @@ -156,25 +156,33 @@ public Gossip Copy(ImmutableSortedSet members = null, GossipOverview ove private void AssertInvariants() { - if (_members.Any(m => m.Status == MemberStatus.Removed)) + void IfTrueThrow(bool func, string expected, string actual) { - var members = string.Join(", ", _members.Where(m => m.Status == MemberStatus.Removed).Select(m => m.ToString())); - throw new ArgumentException($"Live members must not have status [Removed], got {members}", nameof(_members)); + if (func) throw new ArgumentException($"{expected}, but found [{actual}]"); } + + IfTrueThrow(_members.Any(m => m.Status == MemberStatus.Removed), + expected: "Live members must not have status [Removed]", + actual: string.Join(", ", + _members.Where(m => m.Status == MemberStatus.Removed).Select(m => m.ToString()))); + + var inReachabilityButNotMember = _overview.Reachability.AllObservers.Except(_members.Select(m => m.UniqueAddress)); - if (!inReachabilityButNotMember.IsEmpty) - { - var inreachability = string.Join(", ", inReachabilityButNotMember.Select(a => a.ToString())); - throw new ArgumentException($"Nodes not part of cluster in reachability table, got {inreachability}", nameof(_overview)); - } + IfTrueThrow(!inReachabilityButNotMember.IsEmpty, + expected: "Nodes not part of cluster in reachability table", + actual: string.Join(", ", inReachabilityButNotMember.Select(a => a.ToString()))); + + var inReachabilityVersionsButNotMember = + _overview.Reachability.Versions.Keys.Except(Members.Select(x => x.UniqueAddress)).ToImmutableHashSet(); + IfTrueThrow(!inReachabilityVersionsButNotMember.IsEmpty, + expected: "Nodes not part of cluster in reachability versions table", + actual: string.Join(", ", inReachabilityVersionsButNotMember.Select(a => a.ToString()))); var seenButNotMember = _overview.Seen.Except(_members.Select(m => m.UniqueAddress)); - if (!seenButNotMember.IsEmpty) - { - var seen = string.Join(", ", seenButNotMember.Select(a => a.ToString())); - throw new ArgumentException($"Nodes not part of cluster have marked the Gossip as seen, got {seen}", nameof(_overview)); - } + IfTrueThrow(!seenButNotMember.IsEmpty, + expected: "Nodes not part of cluster have marked the Gossip as seen", + actual: string.Join(", ", seenButNotMember.Select(a => a.ToString()))); } //TODO: Serializer should ignore @@ -274,7 +282,7 @@ public Gossip Merge(Gossip that) var mergedMembers = EmptyMembers.Union(Member.PickHighestPriority(this._members, that._members)); // 3. merge reachability table by picking records with highest version - var mergedReachability = this._overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress), + var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress), that._overview.Reachability); // 4. Nobody can have seen this new gossip yet @@ -448,7 +456,7 @@ public override string ToString() /// /// Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. /// - class GossipOverview + internal class GossipOverview { readonly ImmutableHashSet _seen; readonly Reachability _reachability; From ddeb9ebd5ce7b7644dc2ea2a689133b2967bacbc Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 Apr 2021 20:26:28 -0500 Subject: [PATCH 6/7] ReSharper'd Reachability.cs --- .../StressSpec.cs | 16 +- src/core/Akka.Cluster/ClusterHeartbeat.cs | 25 +- src/core/Akka.Cluster/Member.cs | 2 +- src/core/Akka.Cluster/Reachability.cs | 529 +++++++++--------- .../DefaultFailureDetectorRegistry.cs | 1 + .../Akka.Remote/PhiAccrualFailureDetector.cs | 23 +- 6 files changed, 295 insertions(+), 301 deletions(-) diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index cf8825415c1..4b6172d7124 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -426,8 +426,22 @@ public PhiObserver() { var previous = PhiByNodeDefault(node); var p = Phi(node); + if (p > 0 || _cluster.FailureDetector.IsMonitoring(node)) { + if (double.IsInfinity(p)) + { + _log.Warning("Detected phi value of infinity for [{0}] - ", node); + var (history, time) = _cluster.FailureDetector.GetFailureDetector(node) switch + { + PhiAccrualFailureDetector fd => (fd.state.History, fd.state.TimeStamp), + _ => (HeartbeatHistory.Apply(1), null) + }; + _log.Warning("PhiValues: (Timestamp={0}, Mean={1}, Variance={2}, StdDeviation={3}, Intervals=[{4}])",time, + history.Mean, history.Variance, history.StdDeviation, + string.Join(",", history.Intervals)); + } + var aboveOne = !double.IsInfinity(p) && p > 1.0d ? 1 : 0; _phiByNode = _phiByNode.SetItem(node, new PhiValue(node, previous.CountAboveOne + aboveOne, @@ -483,8 +497,6 @@ protected override void PostStop() _checkPhiTask.Cancel(); base.PostStop(); } - - public ITimerScheduler Timers { get; set; } } internal readonly struct PhiValue : IComparable diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index 0ecf786f4a7..c0bbb60f6f7 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -649,11 +649,12 @@ public ImmutableHashSet Receivers(UniqueAddress sender) { if (iter.MoveNext() == false || n == 0) { + iter.Dispose(); // dispose enumerator return (n, acc); } else { - UniqueAddress next = iter.Current; + var next = iter.Current; var isUnreachable = Unreachable.Contains(next); if (isUnreachable && acc.Count >= MonitoredByNumberOfNodes) { @@ -670,13 +671,11 @@ public ImmutableHashSet Receivers(UniqueAddress sender) } }; - var tuple = take(MonitoredByNumberOfNodes, NodeRing().From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet.Empty); - var remaining = tuple.Item1; - var slice1 = tuple.Item2; + var (remaining, slice1) = take(MonitoredByNumberOfNodes, NodeRing().From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet.Empty); IImmutableSet slice = remaining == 0 - ? slice1 - : take(remaining, NodeRing().Until(sender).Where(c => !c.Equals(sender)).GetEnumerator(), slice1).Item2; + ? slice1 // or, wrap0around + : take(remaining, NodeRing().TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2; return slice.ToImmutableHashSet(); } @@ -729,7 +728,9 @@ public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, ImmutableHashSet #region Comparer /// - /// TBD + /// Data structure for picking heartbeat receivers. The node ring is + /// shuffled by deterministic hashing to avoid picking physically co-located + /// neighbors. /// internal class RingComparer : IComparer { @@ -742,12 +743,10 @@ private RingComparer() { } /// public int Compare(UniqueAddress x, UniqueAddress y) { - var result = Member.AddressOrdering.Compare(x.Address, y.Address); - if (result == 0) - if (x.Uid < y.Uid) return -1; - else if (x.Uid == y.Uid) return 0; - else return 1; - return result; + var ha = x.GetHashCode(); + var hb = y.GetHashCode(); + var c = ha.CompareTo(hb); + return c == 0 ? Member.AddressOrdering.Compare(x.Address, y.Address) : c; } } #endregion diff --git a/src/core/Akka.Cluster/Member.cs b/src/core/Akka.Cluster/Member.cs index d5d91d04b82..1bf43ec1b0d 100644 --- a/src/core/Akka.Cluster/Member.cs +++ b/src/core/Akka.Cluster/Member.cs @@ -507,7 +507,7 @@ public bool Equals(UniqueAddress other) /// public override int GetHashCode() { - return Uid; + return MurmurHash.ByteHash(BitConverter.GetBytes(Uid)); } /// diff --git a/src/core/Akka.Cluster/Reachability.cs b/src/core/Akka.Cluster/Reachability.cs index 49912e9e208..e291fcd6bf6 100644 --- a/src/core/Akka.Cluster/Reachability.cs +++ b/src/core/Akka.Cluster/Reachability.cs @@ -15,218 +15,122 @@ namespace Akka.Cluster { /// - /// Immutable data structure that holds the reachability status of subject nodes as seen - /// from observer nodes. Failure detector for the subject nodes exist on the - /// observer nodes. Changes (reachable, unreachable, terminated) are only performed - /// by observer nodes to its own records. Each change bumps the version number of the - /// record, and thereby it is always possible to determine which record is newest - /// merging two instances. - /// - /// Aggregated status of a subject node is defined as (in this order): - /// - Terminated if any observer node considers it as Terminated - /// - Unreachable if any observer node considers it as Unreachable - /// - Reachable otherwise, i.e. no observer node considers it as Unreachable + /// Immutable data structure that holds the reachability status of subject nodes as seen + /// from observer nodes. Failure detector for the subject nodes exist on the + /// observer nodes. Changes (reachable, unreachable, terminated) are only performed + /// by observer nodes to its own records. Each change bumps the version number of the + /// record, and thereby it is always possible to determine which record is newest + /// merging two instances. + /// Aggregated status of a subject node is defined as (in this order): + /// - Terminated if any observer node considers it as Terminated + /// - Unreachable if any observer node considers it as Unreachable + /// - Reachable otherwise, i.e. no observer node considers it as Unreachable /// internal class Reachability //TODO: ISerializable? { /// - /// TBD + /// TBD /// - public static readonly Reachability Empty = + public enum ReachabilityStatus + { + /// + /// TBD + /// + Reachable, + + /// + /// TBD + /// + Unreachable, + + /// + /// TBD + /// + Terminated + } + + /// + /// TBD + /// + public static readonly Reachability Empty = new Reachability(ImmutableList.Create(), ImmutableDictionary.Create()); + //TODO: Serialization should ignore + private readonly Lazy _cache; + /// - /// TBD + /// TBD /// /// TBD /// TBD public Reachability(ImmutableList records, ImmutableDictionary versions) { _cache = new Lazy(() => new Cache(records)); - _versions = versions; - _records = records; + Versions = versions; + Records = records; } /// - /// TBD + /// TBD /// - public sealed class Record - { - readonly UniqueAddress _observer; - /// - /// TBD - /// - public UniqueAddress Observer { get { return _observer; } } - readonly UniqueAddress _subject; - /// - /// TBD - /// - public UniqueAddress Subject { get { return _subject; } } - readonly ReachabilityStatus _status; - /// - /// TBD - /// - public ReachabilityStatus Status { get { return _status; } } - readonly long _version; - /// - /// TBD - /// - public long Version { get { return _version; } } + public ImmutableList Records { get; } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - public Record(UniqueAddress observer, UniqueAddress subject, ReachabilityStatus status, long version) - { - _observer = observer; - _subject = subject; - _status = status; - _version = version; - } - - /// - public override bool Equals(object obj) - { - var other = obj as Record; - if (other == null) return false; - return _version.Equals(other._version) && - _status == other.Status && - _observer.Equals(other._observer) && - _subject.Equals(other._subject); - } + /// + /// TBD + /// + public ImmutableDictionary Versions { get; } - /// - public override int GetHashCode() - { - unchecked - { - var hashCode = (Observer != null ? Observer.GetHashCode() : 0); - hashCode = (hashCode * 397) ^ Version.GetHashCode(); - hashCode = (hashCode * 397) ^ Status.GetHashCode(); - hashCode = (hashCode * 397) ^ (Subject != null ? Subject.GetHashCode() : 0); - return hashCode; - } - } - } + /* + * def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean = + status(observer, subject) == Reachable + */ /// - /// TBD + /// TBD /// - public enum ReachabilityStatus - { - /// - /// TBD - /// - Reachable, - /// - /// TBD - /// - Unreachable, - /// - /// TBD - /// - Terminated - } + public bool IsAllReachable => Records.IsEmpty; /// - /// TBD + /// Doesn't include terminated /// - public ImmutableList Records { get { return _records; } } - readonly ImmutableList _records; + public ImmutableHashSet AllUnreachable => _cache.Value.AllUnreachable; /// - /// TBD + /// TBD /// - public ImmutableDictionary Versions { get { return _versions; } } - readonly ImmutableDictionary _versions; + public ImmutableHashSet AllUnreachableOrTerminated => _cache.Value.AllUnreachableOrTerminated; /// - /// TBD + /// TBD /// - class Cache + public ImmutableDictionary> ObserversGroupedByUnreachable { - /// - /// TBD - /// - public ImmutableDictionary> ObserverRowMap { get { return _observerRowsMap; } } - readonly ImmutableDictionary> _observerRowsMap; - - /// - /// TBD - /// - public ImmutableHashSet AllTerminated { get { return _allTerminated; } } - readonly ImmutableHashSet _allTerminated; - - /// - /// TBD - /// - public ImmutableHashSet AllUnreachable { get { return _allUnreachable; } } - readonly ImmutableHashSet _allUnreachable; - - /// - /// TBD - /// - public ImmutableHashSet AllUnreachableOrTerminated { get { return _allUnreachableOrTerminated; } } - readonly ImmutableHashSet _allUnreachableOrTerminated; - - /// - /// TBD - /// - /// TBD - public Cache(ImmutableList records) + get { - if (records.IsEmpty) - { - _observerRowsMap = ImmutableDictionary.Create>(); - _allTerminated = ImmutableHashSet.Create(); - _allUnreachable = ImmutableHashSet.Create(); - } - else - { - var mapBuilder = new Dictionary>(); - var terminatedBuilder = ImmutableHashSet.CreateBuilder(); - var unreachableBuilder = ImmutableHashSet.CreateBuilder(); - - foreach (var r in records) - { - ImmutableDictionary m = mapBuilder.TryGetValue(r.Observer, out m) - ? m.SetItem(r.Subject, r) - //TODO: Other collections take items for Create. Create unnecessary array here - : ImmutableDictionary.CreateRange(new[] { new KeyValuePair(r.Subject, r) }); - - - mapBuilder.AddOrSet(r.Observer, m); - - if (r.Status == ReachabilityStatus.Unreachable) unreachableBuilder.Add(r.Subject); - else if (r.Status == ReachabilityStatus.Terminated) terminatedBuilder.Add(r.Subject); - } - - _observerRowsMap = ImmutableDictionary.CreateRange(mapBuilder); - _allTerminated = terminatedBuilder.ToImmutable(); - _allUnreachable = unreachableBuilder.ToImmutable().Except(AllTerminated); - } + var builder = new Dictionary>(); - _allUnreachableOrTerminated = _allTerminated.IsEmpty - ? AllUnreachable - : AllUnreachable.Union(AllTerminated); + var grouped = Records.GroupBy(p => p.Subject); + foreach (var records in grouped) + if (records.Any(r => r.Status == ReachabilityStatus.Unreachable)) + builder.Add(records.Key, records.Where(r => r.Status == ReachabilityStatus.Unreachable) + .Select(r => r.Observer).ToImmutableHashSet()); + return builder.ToImmutableDictionary(); } } - //TODO: Serialization should ignore - readonly Lazy _cache; + /// + /// TBD + /// + public ImmutableHashSet AllObservers => Records.Select(i => i.Observer).ToImmutableHashSet(); - ImmutableDictionary ObserverRows(UniqueAddress observer) + private ImmutableDictionary ObserverRows(UniqueAddress observer) { _cache.Value.ObserverRowMap.TryGetValue(observer, out var observerRows); return observerRows; } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -237,7 +141,7 @@ public Reachability Unreachable(UniqueAddress observer, UniqueAddress subject) } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -248,7 +152,7 @@ public Reachability Reachable(UniqueAddress observer, UniqueAddress subject) } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -258,12 +162,12 @@ public Reachability Terminated(UniqueAddress observer, UniqueAddress subject) return Change(observer, subject, ReachabilityStatus.Terminated); } - long CurrentVersion(UniqueAddress observer) + private long CurrentVersion(UniqueAddress observer) { - return _versions.TryGetValue(observer, out long version) ? version : 0; + return Versions.TryGetValue(observer, out var version) ? version : 0; } - long NextVersion(UniqueAddress observer) + private long NextVersion(UniqueAddress observer) { return CurrentVersion(observer) + 1; } @@ -271,18 +175,18 @@ long NextVersion(UniqueAddress observer) private Reachability Change(UniqueAddress observer, UniqueAddress subject, ReachabilityStatus status) { var v = NextVersion(observer); - var newVersions = _versions.SetItem(observer, v); + var newVersions = Versions.SetItem(observer, v); var newRecord = new Record(observer, subject, status, v); var oldObserverRows = ObserverRows(observer); if (oldObserverRows == null && status == ReachabilityStatus.Reachable) return this; - if (oldObserverRows == null) return new Reachability(_records.Add(newRecord), newVersions); + if (oldObserverRows == null) return new Reachability(Records.Add(newRecord), newVersions); - if(!oldObserverRows.TryGetValue(subject, out var oldRecord)) + if (!oldObserverRows.TryGetValue(subject, out var oldRecord)) { if (status == ReachabilityStatus.Reachable && oldObserverRows.Values.All(r => r.Status == ReachabilityStatus.Reachable)) - return new Reachability(_records.FindAll(r => !r.Observer.Equals(observer)), newVersions); - return new Reachability(_records.Add(newRecord), newVersions); + return new Reachability(Records.FindAll(r => !r.Observer.Equals(observer)), newVersions); + return new Reachability(Records.Add(newRecord), newVersions); } if (oldRecord.Status == ReachabilityStatus.Terminated || oldRecord.Status == status) @@ -290,14 +194,14 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach if (status == ReachabilityStatus.Reachable && oldObserverRows.Values.All(r => r.Status == ReachabilityStatus.Reachable || r.Subject.Equals(subject))) - return new Reachability(_records.FindAll(r => !r.Observer.Equals(observer)), newVersions); + return new Reachability(Records.FindAll(r => !r.Observer.Equals(observer)), newVersions); - var newRecords = _records.SetItem(_records.IndexOf(oldRecord), newRecord); + var newRecords = Records.SetItem(Records.IndexOf(oldRecord), newRecord); return new Reachability(newRecords, newVersions); } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -306,7 +210,7 @@ public Reachability Merge(IEnumerable allowed, Reachability other { var recordBuilder = ImmutableList.CreateBuilder(); //TODO: Size hint somehow? - var newVersions = _versions; + var newVersions = Versions; foreach (var observer in allowed) { var observerVersion1 = CurrentVersion(observer); @@ -318,21 +222,18 @@ public Reachability Merge(IEnumerable allowed, Reachability other if (rows1 != null && rows2 != null) { var rows = observerVersion1 > observerVersion2 ? rows1 : rows2; - foreach(var record in rows.Values.Where(r => allowed.Contains(r.Subject))) + foreach (var record in rows.Values.Where(r => allowed.Contains(r.Subject))) recordBuilder.Add(record); } + if (rows1 != null && rows2 == null) - { - if(observerVersion1 > observerVersion2) + if (observerVersion1 > observerVersion2) foreach (var record in rows1.Values.Where(r => allowed.Contains(r.Subject))) recordBuilder.Add(record); - } if (rows1 == null && rows2 != null) - { if (observerVersion2 > observerVersion1) foreach (var record in rows2.Values.Where(r => allowed.Contains(r.Subject))) - recordBuilder.Add(record); - } + recordBuilder.Add(record); if (observerVersion2 > observerVersion1) newVersions = newVersions.SetItem(observer, observerVersion2); @@ -344,20 +245,20 @@ public Reachability Merge(IEnumerable allowed, Reachability other } /// - /// TBD + /// TBD /// /// TBD /// TBD public Reachability Remove(IEnumerable nodes) { var nodesSet = nodes.ToImmutableHashSet(); - var newRecords = _records.FindAll(r => !nodesSet.Contains(r.Observer) && !nodesSet.Contains(r.Subject)); - var newVersions = _versions.RemoveRange(nodes); + var newRecords = Records.FindAll(r => !nodesSet.Contains(r.Observer) && !nodesSet.Contains(r.Subject)); + var newVersions = Versions.RemoveRange(nodes); return new Reachability(newRecords, newVersions); } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -367,12 +268,10 @@ public Reachability RemoveObservers(ImmutableHashSet nodes) { return this; } - else - { - var newRecords = _records.FindAll(r => !nodes.Contains(r.Observer)); - var newVersions = _versions.RemoveRange(nodes); - return new Reachability(newRecords, newVersions); - } + + var newRecords = Records.FindAll(r => !nodes.Contains(r.Observer)); + var newVersions = Versions.RemoveRange(nodes); + return new Reachability(newRecords, newVersions); } public Reachability FilterRecords(Predicate f) @@ -381,7 +280,7 @@ public Reachability FilterRecords(Predicate f) } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -391,14 +290,14 @@ public ReachabilityStatus Status(UniqueAddress observer, UniqueAddress subject) var observerRows = ObserverRows(observer); if (observerRows == null) return ReachabilityStatus.Reachable; - if(!observerRows.TryGetValue(subject, out var record)) + if (!observerRows.TryGetValue(subject, out var record)) return ReachabilityStatus.Reachable; return record.Status; } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -410,7 +309,7 @@ public ReachabilityStatus Status(UniqueAddress node) } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -420,7 +319,7 @@ public bool IsReachable(UniqueAddress node) } /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -430,37 +329,8 @@ public bool IsReachable(UniqueAddress observer, UniqueAddress subject) return Status(observer, subject) == ReachabilityStatus.Reachable; } - /* - * def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean = - status(observer, subject) == Reachable - */ - /// - /// TBD - /// - public bool IsAllReachable - { - get { return _records.IsEmpty; } - } - - /// - /// Doesn't include terminated - /// - public ImmutableHashSet AllUnreachable - { - get { return _cache.Value.AllUnreachable; } - } - - /// - /// TBD - /// - public ImmutableHashSet AllUnreachableOrTerminated - { - get { return _cache.Value.AllUnreachableOrTerminated; } - } - - /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -474,40 +344,7 @@ public ImmutableHashSet AllUnreachableFrom(UniqueAddress observer } /// - /// TBD - /// - public ImmutableDictionary> ObserversGroupedByUnreachable - { - get - { - var builder = new Dictionary>(); - - var grouped = _records.GroupBy(p => p.Subject); - foreach (var records in grouped) - { - if (records.Any(r => r.Status == ReachabilityStatus.Unreachable)) - { - builder.Add(records.Key, records.Where(r => r.Status == ReachabilityStatus.Unreachable) - .Select(r => r.Observer).ToImmutableHashSet()); - } - } - return builder.ToImmutableDictionary(); - } - } - - /// - /// TBD - /// - public ImmutableHashSet AllObservers - { - get - { - return _records.Select(i => i.Observer).ToImmutableHashSet(); - } - } - - /// - /// TBD + /// TBD /// /// TBD /// TBD @@ -518,28 +355,28 @@ public ImmutableList RecordsFrom(UniqueAddress observer) return rows.Values.ToImmutableList(); } - /// + /// public override int GetHashCode() { - return _versions.GetHashCode(); + return Versions.GetHashCode(); } - /// + /// public override bool Equals(object obj) { var other = obj as Reachability; if (other == null) return false; - return _records.Count == other._records.Count && - _versions.Equals(other._versions) && - _cache.Value.ObserverRowMap.Equals(other._cache.Value.ObserverRowMap); + return Records.Count == other.Records.Count && + Versions.Equals(other.Versions) && + _cache.Value.ObserverRowMap.Equals(other._cache.Value.ObserverRowMap); } - /// + /// public override string ToString() { var builder = new StringBuilder("Reachability("); - foreach (var observer in _versions.Keys) + foreach (var observer in Versions.Keys) { var rows = ObserverRows(observer); if (rows == null) continue; @@ -551,5 +388,145 @@ public override string ToString() return builder.Append(')').ToString(); } + + /// + /// TBD + /// + public sealed class Record + { + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + public Record(UniqueAddress observer, UniqueAddress subject, ReachabilityStatus status, long version) + { + Observer = observer; + Subject = subject; + Status = status; + Version = version; + } + + /// + /// TBD + /// + public UniqueAddress Observer { get; } + + /// + /// TBD + /// + public UniqueAddress Subject { get; } + + /// + /// TBD + /// + public ReachabilityStatus Status { get; } + + /// + /// TBD + /// + public long Version { get; } + + /// + public override bool Equals(object obj) + { + var other = obj as Record; + if (other == null) return false; + return Version.Equals(other.Version) && + Status == other.Status && + Observer.Equals(other.Observer) && + Subject.Equals(other.Subject); + } + + /// + public override int GetHashCode() + { + unchecked + { + var hashCode = Observer != null ? Observer.GetHashCode() : 0; + hashCode = (hashCode * 397) ^ Version.GetHashCode(); + hashCode = (hashCode * 397) ^ Status.GetHashCode(); + hashCode = (hashCode * 397) ^ (Subject != null ? Subject.GetHashCode() : 0); + return hashCode; + } + } + } + + /// + /// TBD + /// + private class Cache + { + /// + /// TBD + /// + /// TBD + public Cache(ImmutableList records) + { + if (records.IsEmpty) + { + ObserverRowMap = ImmutableDictionary + .Create>(); + AllTerminated = ImmutableHashSet.Create(); + AllUnreachable = ImmutableHashSet.Create(); + } + else + { + var mapBuilder = new Dictionary>(); + var terminatedBuilder = ImmutableHashSet.CreateBuilder(); + var unreachableBuilder = ImmutableHashSet.CreateBuilder(); + + foreach (var r in records) + { + ImmutableDictionary m = mapBuilder.TryGetValue(r.Observer, out m) + ? m.SetItem(r.Subject, r) + //TODO: Other collections take items for Create. Create unnecessary array here + : ImmutableDictionary.CreateRange(new[] + { + new KeyValuePair(r.Subject, r) + }); + + + mapBuilder.AddOrSet(r.Observer, m); + + if (r.Status == ReachabilityStatus.Unreachable) unreachableBuilder.Add(r.Subject); + else if (r.Status == ReachabilityStatus.Terminated) terminatedBuilder.Add(r.Subject); + } + + ObserverRowMap = ImmutableDictionary.CreateRange(mapBuilder); + AllTerminated = terminatedBuilder.ToImmutable(); + AllUnreachable = unreachableBuilder.ToImmutable().Except(AllTerminated); + } + + AllUnreachableOrTerminated = AllTerminated.IsEmpty + ? AllUnreachable + : AllUnreachable.Union(AllTerminated); + } + + /// + /// TBD + /// + public ImmutableDictionary> ObserverRowMap + { + get; + } + + /// + /// TBD + /// + public ImmutableHashSet AllTerminated { get; } + + /// + /// TBD + /// + public ImmutableHashSet AllUnreachable { get; } + + /// + /// TBD + /// + public ImmutableHashSet AllUnreachableOrTerminated { get; } + } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs b/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs index c2f85dbca6f..bf617a09e90 100644 --- a/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs +++ b/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs @@ -122,6 +122,7 @@ public void Remove(T resource) } break; } + Console.WriteLine("Removed heartbeats to {0}", resource); } /// diff --git a/src/core/Akka.Remote/PhiAccrualFailureDetector.cs b/src/core/Akka.Remote/PhiAccrualFailureDetector.cs index 46e9e22282d..3d1e5fcafca 100644 --- a/src/core/Akka.Remote/PhiAccrualFailureDetector.cs +++ b/src/core/Akka.Remote/PhiAccrualFailureDetector.cs @@ -154,7 +154,7 @@ public State(HeartbeatHistory history, long? timeStamp) private AtomicReference _state; - private State state + internal State state { get { return _state; } set { _state = value; } @@ -209,6 +209,8 @@ public override void HeartBeat() } var newState = new State(newHistory, timestamp); + Console.WriteLine("Replacing state with timestamp [{0}] with timestamp [{1}] for node {2}", oldState.TimeStamp, newState.TimeStamp, Address); + //if we won the race then update else try again if(!_state.CompareAndSet(oldState, newState)) HeartBeat(); } @@ -248,6 +250,8 @@ internal double Phi(long timestamp) var history = oldState.History; var mean = history.Mean; var stdDeviation = EnsureValidStdDeviation(history.StdDeviation); + if(timeDiff > (mean * 2)) + Console.WriteLine("About to process very large timeDiff: {0} vs mean: {1} for node {2}", timeDiff, mean, Address); return Phi(timeDiff, mean + AcceptableHeartbeatPauseMillis, stdDeviation); } } @@ -304,7 +308,6 @@ private double EnsureValidStdDeviation(double stdDeviation) internal readonly struct HeartbeatHistory { private readonly int _maxSampleSize; - private readonly ImmutableList _intervals; private readonly long _intervalSum; private readonly long _squaredIntervalSum; @@ -326,7 +329,7 @@ internal readonly struct HeartbeatHistory public HeartbeatHistory(int maxSampleSize, ImmutableList intervals, long intervalSum, long squaredIntervalSum) { _maxSampleSize = maxSampleSize; - _intervals = intervals; + Intervals = intervals; _intervalSum = intervalSum; _squaredIntervalSum = squaredIntervalSum; @@ -338,12 +341,14 @@ public HeartbeatHistory(int maxSampleSize, ImmutableList intervals, long i throw new ArgumentOutOfRangeException(nameof(squaredIntervalSum), $"squaredIntervalSum must be >= 0, got {squaredIntervalSum}"); } - public double Mean => ((double)_intervalSum / _intervals.Count); + public double Mean => ((double)_intervalSum / Intervals.Count); - public double Variance => ((double)_squaredIntervalSum / _intervals.Count) - (Mean * Mean); + public double Variance => ((double)_squaredIntervalSum / Intervals.Count) - (Mean * Mean); public double StdDeviation => Math.Sqrt(Variance); + public ImmutableList Intervals { get; } + /// /// Increments the . /// @@ -352,9 +357,9 @@ public HeartbeatHistory(int maxSampleSize, ImmutableList intervals, long i /// A new heartbeat history instance with the added interval. public static HeartbeatHistory operator +(HeartbeatHistory history, long interval) { - if (history._intervals.Count < history._maxSampleSize) + if (history.Intervals.Count < history._maxSampleSize) { - return new HeartbeatHistory(history._maxSampleSize, history._intervals.Add(interval), + return new HeartbeatHistory(history._maxSampleSize, history.Intervals.Add(interval), history._intervalSum + interval, history._squaredIntervalSum + Pow2(interval)); } else @@ -365,8 +370,8 @@ public HeartbeatHistory(int maxSampleSize, ImmutableList intervals, long i private static HeartbeatHistory DropOldest(HeartbeatHistory history) { - return new HeartbeatHistory(history._maxSampleSize, history._intervals.RemoveAt(0), history._intervalSum - history._intervals.First(), - history._squaredIntervalSum - Pow2(history._intervals.First())); + return new HeartbeatHistory(history._maxSampleSize, history.Intervals.RemoveAt(0), history._intervalSum - history.Intervals.First(), + history._squaredIntervalSum - Pow2(history.Intervals.First())); } private static long Pow2(long x) From 796eed907f71a96928d82745bb699bdbf18c658d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 Apr 2021 14:28:54 -0500 Subject: [PATCH 7/7] cleaned up immutability and CAS issues inside DefaultFailureDetectorRegistry added bugfix from https://github.com/akka/akka/pull/23595 --- src/common.props | 4 ++-- src/core/Akka.Cluster/ClusterDaemon.cs | 5 ++++- src/core/Akka.Cluster/ClusterHeartbeat.cs | 8 +++++--- src/core/Akka.Cluster/Gossip.cs | 3 --- src/core/Akka.Cluster/Member.cs | 4 ++-- src/core/Akka.Cluster/Reachability.cs | 2 +- .../DefaultFailureDetectorRegistry.cs | 19 +++++++++---------- .../Akka.Remote/PhiAccrualFailureDetector.cs | 5 +---- 8 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/common.props b/src/common.props index 823aa061ae1..844abcc4992 100644 --- a/src/common.props +++ b/src/common.props @@ -2,7 +2,7 @@ Copyright © 2013-2021 Akka.NET Team Akka.NET Team - 1.4.18 + 1.4.19 https://getakka.net/images/akkalogo.png https://github.com/akkadotnet/akka.net https://github.com/akkadotnet/akka.net/blob/master/LICENSE @@ -29,7 +29,7 @@ true - Placeholder for Nightlies** + Placeholder for nightlies** diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index c8a1d3e9333..77013e8e360 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -1964,8 +1964,11 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) // for all new joining nodes we remove them from the failure detector foreach (var node in _latestGossip.Members) { - if (node.Status == MemberStatus.Joining && !localGossip.Members.Contains(node)) + if (!localGossip.Members.Contains(node)) + { _cluster.FailureDetector.Remove(node.Address); + } + } _log.Debug("Cluster Node [{0}] - Receiving gossip from [{1}]", _cluster.SelfAddress, from); diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs index c0bbb60f6f7..b2b2b83f3e8 100644 --- a/src/core/Akka.Cluster/ClusterHeartbeat.cs +++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs @@ -510,7 +510,9 @@ private ClusterHeartbeatSenderState MembershipChange(HeartbeatNodeRing newRing) foreach (var r in removedReceivers) { if (FailureDetector.IsAvailable(r.Address)) + { FailureDetector.Remove(r.Address); + } else { adjustedOldReceiversNowUnreachable = adjustedOldReceiversNowUnreachable.Add(r); @@ -674,7 +676,7 @@ public ImmutableHashSet Receivers(UniqueAddress sender) var (remaining, slice1) = take(MonitoredByNumberOfNodes, NodeRing().From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet.Empty); IImmutableSet slice = remaining == 0 - ? slice1 // or, wrap0around + ? slice1 // or, wrap-around : take(remaining, NodeRing().TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2; return slice.ToImmutableHashSet(); @@ -743,8 +745,8 @@ private RingComparer() { } /// public int Compare(UniqueAddress x, UniqueAddress y) { - var ha = x.GetHashCode(); - var hb = y.GetHashCode(); + var ha = x.Uid; + var hb = y.Uid; var c = ha.CompareTo(hb); return c == 0 ? Member.AddressOrdering.Compare(x.Address, y.Address) : c; } diff --git a/src/core/Akka.Cluster/Gossip.cs b/src/core/Akka.Cluster/Gossip.cs index bf875df4157..3777459fd95 100644 --- a/src/core/Akka.Cluster/Gossip.cs +++ b/src/core/Akka.Cluster/Gossip.cs @@ -516,9 +516,6 @@ public GossipOverview Copy(ImmutableHashSet seen = null, Reachabi /// class GossipEnvelope : IClusterMessage { - //TODO: Serialization? - //TODO: ser stuff? - readonly UniqueAddress _from; readonly UniqueAddress _to; diff --git a/src/core/Akka.Cluster/Member.cs b/src/core/Akka.Cluster/Member.cs index 1bf43ec1b0d..15971d7eb6f 100644 --- a/src/core/Akka.Cluster/Member.cs +++ b/src/core/Akka.Cluster/Member.cs @@ -502,12 +502,12 @@ public bool Equals(UniqueAddress other) } /// - public override bool Equals(object obj) => obj is UniqueAddress && Equals((UniqueAddress)obj); + public override bool Equals(object obj) => obj is UniqueAddress address && Equals(address); /// public override int GetHashCode() { - return MurmurHash.ByteHash(BitConverter.GetBytes(Uid)); + return Uid; } /// diff --git a/src/core/Akka.Cluster/Reachability.cs b/src/core/Akka.Cluster/Reachability.cs index e291fcd6bf6..c7da1f16412 100644 --- a/src/core/Akka.Cluster/Reachability.cs +++ b/src/core/Akka.Cluster/Reachability.cs @@ -26,7 +26,7 @@ namespace Akka.Cluster /// - Unreachable if any observer node considers it as Unreachable /// - Reachable otherwise, i.e. no observer node considers it as Unreachable /// - internal class Reachability //TODO: ISerializable? + internal class Reachability { /// /// TBD diff --git a/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs b/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs index bf617a09e90..21ce673219f 100644 --- a/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs +++ b/src/core/Akka.Remote/DefaultFailureDetectorRegistry.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using Akka.Util; namespace Akka.Remote @@ -30,11 +31,11 @@ public DefaultFailureDetectorRegistry(Func factory) private readonly Func _factory; - private AtomicReference> _resourceToFailureDetector = new AtomicReference>(new Dictionary()); + private AtomicReference> _resourceToFailureDetector = new AtomicReference>(ImmutableDictionary.Empty); private readonly object _failureDetectorCreationLock = new object(); - private Dictionary ResourceToFailureDetector + private ImmutableDictionary ResourceToFailureDetector { get { return _resourceToFailureDetector; } set { _resourceToFailureDetector = value; } @@ -83,7 +84,7 @@ public void Heartbeat(T resource) { // First check for non-existing key wa outside the lock, and a second thread might just have released the lock // when this one acquired it, so the second check is needed (double-check locking pattern) - var oldTable = new Dictionary(ResourceToFailureDetector); + var oldTable = ResourceToFailureDetector; if (oldTable.TryGetValue(resource, out failureDetector)) failureDetector.HeartBeat(); else @@ -98,8 +99,8 @@ public void Heartbeat(T resource) } newDetector.HeartBeat(); - oldTable.Add(resource, newDetector); - ResourceToFailureDetector = oldTable; + var newTable = oldTable.Add(resource, newDetector); + ResourceToFailureDetector = newTable; } } } @@ -116,13 +117,11 @@ public void Remove(T resource) var oldTable = ResourceToFailureDetector; if (oldTable.ContainsKey(resource)) { - var newTable = new Dictionary(oldTable); - newTable.Remove(resource); //if we won the race then update else try again - if (_resourceToFailureDetector.CompareAndSet(oldTable, newTable)) continue; + var newTable = oldTable.Remove(resource); //if we won the race then update else try again + if (!_resourceToFailureDetector.CompareAndSet(oldTable, newTable)) continue; } break; } - Console.WriteLine("Removed heartbeats to {0}", resource); } /// @@ -134,7 +133,7 @@ public void Reset() { var oldTable = ResourceToFailureDetector; // if we won the race then update else try again - if (_resourceToFailureDetector.CompareAndSet(oldTable, new Dictionary())) continue; + if (!_resourceToFailureDetector.CompareAndSet(oldTable, ImmutableDictionary.Empty)) continue; break; } } diff --git a/src/core/Akka.Remote/PhiAccrualFailureDetector.cs b/src/core/Akka.Remote/PhiAccrualFailureDetector.cs index 3d1e5fcafca..772ba123d45 100644 --- a/src/core/Akka.Remote/PhiAccrualFailureDetector.cs +++ b/src/core/Akka.Remote/PhiAccrualFailureDetector.cs @@ -209,8 +209,7 @@ public override void HeartBeat() } var newState = new State(newHistory, timestamp); - Console.WriteLine("Replacing state with timestamp [{0}] with timestamp [{1}] for node {2}", oldState.TimeStamp, newState.TimeStamp, Address); - + //if we won the race then update else try again if(!_state.CompareAndSet(oldState, newState)) HeartBeat(); } @@ -250,8 +249,6 @@ internal double Phi(long timestamp) var history = oldState.History; var mean = history.Mean; var stdDeviation = EnsureValidStdDeviation(history.StdDeviation); - if(timeDiff > (mean * 2)) - Console.WriteLine("About to process very large timeDiff: {0} vs mean: {1} for node {2}", timeDiff, mean, Address); return Phi(timeDiff, mean + AcceptableHeartbeatPauseMillis, stdDeviation); } }