Skip to content

Commit

Permalink
Turned HeatbeatNodeRing into struct (#4944)
Browse files Browse the repository at this point in the history
* added benchmark for HeartbeatNodeRing performance

* switched to local function

No perf change

* approve Akka.Benchmarks friend assembly for Akka.Cluster

* remove HeartbeatNodeRing.NodeRing() allocation and make field immutable

* made it so Akka.Util.Internal.ArrayExtensions.From no longer allocates (much)

* added some descriptive comments on HeartbeatNodeRing.Receivers

* Replaced `Lazy<T>` with `Option<T>` and a similar lazy initialization check

 Improved throughput by ~10% on larger collections and further reduced memory allocation.

* changed return types to `IImmutableSet`

Did this in order to reduce allocations from constantly converting back and forth from `ImmutableSortedSet<T>` and `ImmutableHashSet<T>` - that way we can just use whatever the underlying collection type is.

* converted `HeartbeatNodeRing` into a `struct`

improved performance some, but I don't want to lump it in with other changes just in case
  • Loading branch information
Aaronontheweb authored Apr 19, 2021
1 parent 2de428b commit aed2cbc
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/core/Akka.Cluster/ClusterHeartbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ public ClusterHeartbeatSenderState HeartbeatRsp(UniqueAddress from)
/// <param name="oldReceiversNowUnreachable">TBD</param>
/// <param name="failureDetector">TBD</param>
/// <returns>TBD</returns>
public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing ring = null, ImmutableHashSet<UniqueAddress> oldReceiversNowUnreachable = null, IFailureDetectorRegistry<Address> failureDetector = null)
public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing? ring = null, ImmutableHashSet<UniqueAddress> oldReceiversNowUnreachable = null, IFailureDetectorRegistry<Address> failureDetector = null)
{
return new ClusterHeartbeatSenderState(ring ?? Ring, oldReceiversNowUnreachable ?? OldReceiversNowUnreachable, failureDetector ?? FailureDetector);
}
Expand All @@ -566,7 +566,7 @@ public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing ring = null, Immutable
///
/// It is immutable, i.e. the methods all return new instances.
/// </summary>
internal sealed class HeartbeatNodeRing
internal struct HeartbeatNodeRing
{
private readonly bool _useAllAsReceivers;
private Option<IImmutableSet<UniqueAddress>> _myReceivers;
Expand Down Expand Up @@ -656,37 +656,40 @@ public IImmutableSet<UniqueAddress> Receivers(UniqueAddress sender)
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
(int, ImmutableSortedSet<UniqueAddress>) Take(int n, IEnumerator<UniqueAddress> iter, ImmutableSortedSet<UniqueAddress> acc)
(int, ImmutableSortedSet<UniqueAddress>) Take(int n, IEnumerator<UniqueAddress> iter, ImmutableSortedSet<UniqueAddress> acc, ImmutableHashSet<UniqueAddress> unreachable, int monitoredByNumberOfNodes)
{
if (iter.MoveNext() == false || n == 0)
while (true)
{
iter.Dispose(); // dispose enumerator
return (n, acc);
}
else
{
var next = iter.Current;
var isUnreachable = Unreachable.Contains(next);
if (isUnreachable && acc.Count >= MonitoredByNumberOfNodes)
{
return Take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
}
else if (isUnreachable)
if (iter.MoveNext() == false || n == 0)
{
return Take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
iter.Dispose(); // dispose enumerator
return (n, acc);
}
else
{
return Take(n - 1, iter, acc.Add(next)); // include the reachable
var next = iter.Current;
var isUnreachable = unreachable.Contains(next);
if (isUnreachable && acc.Count >= monitoredByNumberOfNodes)
{
}
else if (isUnreachable)
{
acc = acc.Add(next);
}
else
{
n = n - 1;
acc = acc.Add(next);
}
}
}
}

var (remaining, slice1) = Take(MonitoredByNumberOfNodes, NodeRing.From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty);
var (remaining, slice1) = Take(MonitoredByNumberOfNodes, NodeRing.From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty, Unreachable, MonitoredByNumberOfNodes);

IImmutableSet<UniqueAddress> slice = remaining == 0
? slice1 // or, wrap-around
: Take(remaining, NodeRing.TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2;
: Take(remaining, NodeRing.TakeWhile(x => x != sender).GetEnumerator(), slice1, Unreachable, MonitoredByNumberOfNodes).Item2;

return slice;
}
Expand Down

0 comments on commit aed2cbc

Please sign in to comment.