diff --git a/src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs index 32869c18869..fc8362a2bd8 100644 --- a/src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using Akka.Util; using Akka.Actor; @@ -13,7 +14,7 @@ namespace Akka.Benchmarks.Cluster [Config(typeof(MicroBenchmarkConfig))] public class ReachabilityBenchmarks { - [Params(10, 100, 250)] + [Params(100)] public int NodesSize; [Params(100)] @@ -51,7 +52,7 @@ private Reachability AddUnreachable(Reachability baseReachability, int count) internal Reachability Reachability1; internal Reachability Reachability2; internal Reachability Reachability3; - internal HashSet Allowed; + internal ImmutableHashSet Allowed; [GlobalSetup] public void Setup() @@ -59,7 +60,7 @@ public void Setup() Reachability1 = CreateReachabilityOfSize(Reachability.Empty, NodesSize); Reachability2 = CreateReachabilityOfSize(Reachability1, NodesSize); Reachability3 = AddUnreachable(Reachability1, NodesSize / 2); - Allowed = Reachability1.Versions.Keys.ToHashSet(); + Allowed = Reachability1.Versions.Keys.ToImmutableHashSet(); } private void CheckThunkFor(Reachability r1, Reachability r2, Action thunk, diff --git a/src/core/Akka.Cluster/Gossip.cs b/src/core/Akka.Cluster/Gossip.cs index 3777459fd95..0a680a4d939 100644 --- a/src/core/Akka.Cluster/Gossip.cs +++ b/src/core/Akka.Cluster/Gossip.cs @@ -282,7 +282,7 @@ public Gossip Merge(Gossip that) var mergedMembers = EmptyMembers.Union(Member.PickHighestPriority(this._members, that._members)); // 3. merge reachability table by picking records with highest version - var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress), + var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress).ToImmutableSortedSet(), that._overview.Reachability); // 4. Nobody can have seen this new gossip yet diff --git a/src/core/Akka.Cluster/Reachability.cs b/src/core/Akka.Cluster/Reachability.cs index c7da1f16412..133f36a159a 100644 --- a/src/core/Akka.Cluster/Reachability.cs +++ b/src/core/Akka.Cluster/Reachability.cs @@ -55,7 +55,6 @@ public enum ReachabilityStatus public static readonly Reachability Empty = new Reachability(ImmutableList.Create(), ImmutableDictionary.Create()); - //TODO: Serialization should ignore private readonly Lazy _cache; /// @@ -80,11 +79,6 @@ public Reachability(ImmutableList records, ImmutableDictionary public ImmutableDictionary Versions { get; } - /* - * def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean = - status(observer, subject) == Reachable - */ - /// /// TBD /// @@ -178,7 +172,11 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach var newVersions = Versions.SetItem(observer, v); var newRecord = new Record(observer, subject, status, v); var oldObserverRows = ObserverRows(observer); + + // don't record Reachable observation if nothing has been noted so far if (oldObserverRows == null && status == ReachabilityStatus.Reachable) return this; + + // otherwise, create new instance including this first observation if (oldObserverRows == null) return new Reachability(Records.Add(newRecord), newVersions); if (!oldObserverRows.TryGetValue(subject, out var oldRecord)) @@ -206,7 +204,7 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach /// TBD /// TBD /// TBD - public Reachability Merge(IEnumerable allowed, Reachability other) + public Reachability Merge(IImmutableSet allowed, Reachability other) { var recordBuilder = ImmutableList.CreateBuilder(); //TODO: Size hint somehow? @@ -337,7 +335,7 @@ public bool IsReachable(UniqueAddress observer, UniqueAddress subject) public ImmutableHashSet AllUnreachableFrom(UniqueAddress observer) { var observerRows = ObserverRows(observer); - if (observerRows == null) return ImmutableHashSet.Create(); + if (observerRows == null) return ImmutableHashSet.Empty; return ImmutableHashSet.CreateRange( observerRows.Where(p => p.Value.Status == ReachabilityStatus.Unreachable).Select(p => p.Key)); @@ -351,16 +349,18 @@ public ImmutableHashSet AllUnreachableFrom(UniqueAddress observer public ImmutableList RecordsFrom(UniqueAddress observer) { var rows = ObserverRows(observer); - if (rows == null) return ImmutableList.Create(); + if (rows == null) return ImmutableList.Empty; return rows.Values.ToImmutableList(); } + /// only used for testing /// public override int GetHashCode() { return Versions.GetHashCode(); } + /// only used for testing /// public override bool Equals(object obj) { @@ -467,10 +467,10 @@ public Cache(ImmutableList records) { if (records.IsEmpty) { - ObserverRowMap = ImmutableDictionary - .Create>(); - AllTerminated = ImmutableHashSet.Create(); - AllUnreachable = ImmutableHashSet.Create(); + ObserverRowMap = ImmutableDictionary> + .Empty; + AllTerminated = ImmutableHashSet.Empty; + AllUnreachable = ImmutableHashSet.Empty; } else { @@ -480,16 +480,12 @@ public Cache(ImmutableList records) foreach (var r in records) { - ImmutableDictionary m = mapBuilder.TryGetValue(r.Observer, out m) - ? m.SetItem(r.Subject, r) - //TODO: Other collections take items for Create. Create unnecessary array here - : ImmutableDictionary.CreateRange(new[] - { - new KeyValuePair(r.Subject, r) - }); + ImmutableDictionary m = mapBuilder.TryGetValue(r.Observer, out var mR) + ? mR.SetItem(r.Subject, r) + : ImmutableDictionary.Empty.Add(r.Subject, r); - mapBuilder.AddOrSet(r.Observer, m); + mapBuilder[r.Observer] = m; if (r.Status == ReachabilityStatus.Unreachable) unreachableBuilder.Add(r.Subject); else if (r.Status == ReachabilityStatus.Terminated) terminatedBuilder.Add(r.Subject); @@ -514,12 +510,12 @@ public ImmutableDictionary - /// TBD + /// Contains all nodes that have been observed as Terminated by at least one other node. /// public ImmutableHashSet AllTerminated { get; } /// - /// TBD + /// Contains all nodes that have been observed as Unreachable by at least one other node. /// public ImmutableHashSet AllUnreachable { get; }