Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reachability performance optimziation #4955

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Util;
using Akka.Actor;
Expand All @@ -13,7 +14,7 @@ namespace Akka.Benchmarks.Cluster
[Config(typeof(MicroBenchmarkConfig))]
public class ReachabilityBenchmarks
{
[Params(10, 100, 250)]
[Params(100)]
public int NodesSize;

[Params(100)]
Expand Down Expand Up @@ -51,15 +52,15 @@ private Reachability AddUnreachable(Reachability baseReachability, int count)
internal Reachability Reachability1;
internal Reachability Reachability2;
internal Reachability Reachability3;
internal HashSet<UniqueAddress> Allowed;
internal ImmutableHashSet<UniqueAddress> Allowed;

[GlobalSetup]
public void Setup()
{
Reachability1 = CreateReachabilityOfSize(Reachability.Empty, NodesSize);
Reachability2 = CreateReachabilityOfSize(Reachability1, NodesSize);
Reachability3 = AddUnreachable(Reachability1, NodesSize / 2);
Allowed = Reachability1.Versions.Keys.ToHashSet();
Allowed = Reachability1.Versions.Keys.ToImmutableHashSet();
}

private void CheckThunkFor(Reachability r1, Reachability r2, Action<Reachability, Reachability> thunk,
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public Gossip Merge(Gossip that)
var mergedMembers = EmptyMembers.Union(Member.PickHighestPriority(this._members, that._members));

// 3. merge reachability table by picking records with highest version
var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress),
var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress).ToImmutableSortedSet(),
that._overview.Reachability);

// 4. Nobody can have seen this new gossip yet
Expand Down
42 changes: 19 additions & 23 deletions src/core/Akka.Cluster/Reachability.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public enum ReachabilityStatus
public static readonly Reachability Empty =
new Reachability(ImmutableList.Create<Record>(), ImmutableDictionary.Create<UniqueAddress, long>());

//TODO: Serialization should ignore
private readonly Lazy<Cache> _cache;

/// <summary>
Expand All @@ -80,11 +79,6 @@ public Reachability(ImmutableList<Record> records, ImmutableDictionary<UniqueAdd
/// </summary>
public ImmutableDictionary<UniqueAddress, long> Versions { get; }

/*
* def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean =
status(observer, subject) == Reachable
*/

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -178,7 +172,11 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach
var newVersions = Versions.SetItem(observer, v);
var newRecord = new Record(observer, subject, status, v);
var oldObserverRows = ObserverRows(observer);

// don't record Reachable observation if nothing has been noted so far
if (oldObserverRows == null && status == ReachabilityStatus.Reachable) return this;

// otherwise, create new instance including this first observation
if (oldObserverRows == null) return new Reachability(Records.Add(newRecord), newVersions);

if (!oldObserverRows.TryGetValue(subject, out var oldRecord))
Expand Down Expand Up @@ -206,7 +204,7 @@ private Reachability Change(UniqueAddress observer, UniqueAddress subject, Reach
/// <param name="allowed">TBD</param>
/// <param name="other">TBD</param>
/// <returns>TBD</returns>
public Reachability Merge(IEnumerable<UniqueAddress> allowed, Reachability other)
public Reachability Merge(IImmutableSet<UniqueAddress> allowed, Reachability other)
{
var recordBuilder = ImmutableList.CreateBuilder<Record>();
//TODO: Size hint somehow?
Expand Down Expand Up @@ -337,7 +335,7 @@ public bool IsReachable(UniqueAddress observer, UniqueAddress subject)
public ImmutableHashSet<UniqueAddress> AllUnreachableFrom(UniqueAddress observer)
{
var observerRows = ObserverRows(observer);
if (observerRows == null) return ImmutableHashSet.Create<UniqueAddress>();
if (observerRows == null) return ImmutableHashSet<UniqueAddress>.Empty;
return
ImmutableHashSet.CreateRange(
observerRows.Where(p => p.Value.Status == ReachabilityStatus.Unreachable).Select(p => p.Key));
Expand All @@ -351,16 +349,18 @@ public ImmutableHashSet<UniqueAddress> AllUnreachableFrom(UniqueAddress observer
public ImmutableList<Record> RecordsFrom(UniqueAddress observer)
{
var rows = ObserverRows(observer);
if (rows == null) return ImmutableList.Create<Record>();
if (rows == null) return ImmutableList<Record>.Empty;
return rows.Values.ToImmutableList();
}

/// only used for testing
/// <inheritdoc />
public override int GetHashCode()
{
return Versions.GetHashCode();
}

/// only used for testing
/// <inheritdoc />
public override bool Equals(object obj)
{
Expand Down Expand Up @@ -467,10 +467,10 @@ public Cache(ImmutableList<Record> records)
{
if (records.IsEmpty)
{
ObserverRowMap = ImmutableDictionary
.Create<UniqueAddress, ImmutableDictionary<UniqueAddress, Record>>();
AllTerminated = ImmutableHashSet.Create<UniqueAddress>();
AllUnreachable = ImmutableHashSet.Create<UniqueAddress>();
ObserverRowMap = ImmutableDictionary<UniqueAddress, ImmutableDictionary<UniqueAddress, Record>>
.Empty;
AllTerminated = ImmutableHashSet<UniqueAddress>.Empty;
AllUnreachable = ImmutableHashSet<UniqueAddress>.Empty;
}
else
{
Expand All @@ -480,16 +480,12 @@ public Cache(ImmutableList<Record> records)

foreach (var r in records)
{
ImmutableDictionary<UniqueAddress, Record> m = mapBuilder.TryGetValue(r.Observer, out m)
? m.SetItem(r.Subject, r)
//TODO: Other collections take items for Create. Create unnecessary array here
: ImmutableDictionary.CreateRange(new[]
{
new KeyValuePair<UniqueAddress, Record>(r.Subject, r)
});
ImmutableDictionary<UniqueAddress, Record> m = mapBuilder.TryGetValue(r.Observer, out var mR)
? mR.SetItem(r.Subject, r)
: ImmutableDictionary<UniqueAddress, Record>.Empty.Add(r.Subject, r);


mapBuilder.AddOrSet(r.Observer, m);
mapBuilder[r.Observer] = m;

if (r.Status == ReachabilityStatus.Unreachable) unreachableBuilder.Add(r.Subject);
else if (r.Status == ReachabilityStatus.Terminated) terminatedBuilder.Add(r.Subject);
Expand All @@ -514,12 +510,12 @@ public ImmutableDictionary<UniqueAddress, ImmutableDictionary<UniqueAddress, Rec
}

/// <summary>
/// TBD
/// Contains all nodes that have been observed as Terminated by at least one other node.
/// </summary>
public ImmutableHashSet<UniqueAddress> AllTerminated { get; }

/// <summary>
/// TBD
/// Contains all nodes that have been observed as Unreachable by at least one other node.
/// </summary>
public ImmutableHashSet<UniqueAddress> AllUnreachable { get; }

Expand Down