Skip to content

Commit

Permalink
HeartbeatNodeRing performance (#4943)
Browse files Browse the repository at this point in the history
* added benchmark for HeartbeatNodeRing performance

* switched to local function

No perf change

* approve Akka.Benchmarks friend assembly for Akka.Cluster

* remove HeartbeatNodeRing.NodeRing() allocation and make field immutable

* made it so Akka.Util.Internal.ArrayExtensions.From no longer allocates (much)

* added some descriptive comments on HeartbeatNodeRing.Receivers

* Replaced `Lazy<T>` with `Option<T>` and a similar lazy initialization check

 Improved throughput by ~10% on larger collections and further reduced memory allocation.

* changed return types to `IImmutableSet`

Did this in order to reduce allocations from constantly converting back and forth from `ImmutableSortedSet<T>` and `ImmutableHashSet<T>` - that way we can just use whatever the underlying collection type is.

* added ReachabilityBenchmarks
  • Loading branch information
Aaronontheweb authored Apr 18, 2021
1 parent ac07a0f commit 234188e
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 31 deletions.
2 changes: 2 additions & 0 deletions src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
<PackageReference Include="BenchmarkDotNet" Version="0.12.1" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<!-- FluentAssertions is used in some benchmarks to validate internal behaviors -->
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster;
using BenchmarkDotNet.Attributes;
using FluentAssertions;

namespace Akka.Benchmarks.Cluster
{
[Config(typeof(MicroBenchmarkConfig))]
public class HeartbeatNodeRingBenchmarks
{
[Params(10, 100, 250)]
public int NodesSize;


internal static HeartbeatNodeRing CreateHearbeatNodeRingOfSize(int size)
{
var nodes = Enumerable.Range(1, size)
.Select(x => new UniqueAddress(new Address("akka", "sys", "node-" + x, 2552), x))
.ToList();
var selfAddress = nodes[size / 2];
return new HeartbeatNodeRing(selfAddress, nodes.ToImmutableHashSet(), ImmutableHashSet<UniqueAddress>.Empty, 5);
}

private HeartbeatNodeRing _ring;

[GlobalSetup]
public void Setup()
{
_ring = CreateHearbeatNodeRingOfSize(NodesSize);
}

private static void MyReceivers(HeartbeatNodeRing ring)
{
var r = new HeartbeatNodeRing(ring.SelfAddress, ring.Nodes, ImmutableHashSet<UniqueAddress>.Empty, ring.MonitoredByNumberOfNodes);
r.MyReceivers.Value.Count.Should().BeGreaterThan(0);
}

[Benchmark]
[Arguments(1000)]
public void HeartbeatNodeRing_should_produce_MyReceivers(int iterations)
{
for(var i = 0; i < iterations; i++)
MyReceivers(_ring);
}
}
}
167 changes: 167 additions & 0 deletions src/benchmark/Akka.Benchmarks/Cluster/ReachabilityBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Util;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster;
using BenchmarkDotNet.Attributes;
using FluentAssertions;

namespace Akka.Benchmarks.Cluster
{
[Config(typeof(MicroBenchmarkConfig))]
public class ReachabilityBenchmarks
{
[Params(10, 100, 250)]
public int NodesSize;

[Params(100)]
public int Iterations;

public Address Address = new Address("akka", "sys", "a", 2552);
public Address Node = new Address("akka", "sys", "a", 2552);

private Reachability CreateReachabilityOfSize(Reachability baseReachability, int size)
{
return Enumerable.Range(1, size).Aggregate(baseReachability, (r, i) =>
{
var obs = new UniqueAddress(Address.WithHost("node-" + i), i);
var j = i == size ? 1 : i + 1;
var subject = new UniqueAddress(Address.WithHost("node-" + j), j);
return r.Unreachable(obs, subject).Reachable(obs, subject);
});
}

private Reachability AddUnreachable(Reachability baseReachability, int count)
{
var observers = baseReachability.Versions.Keys.Take(count);
// the Keys HashSet<T> IEnumerator does not support Reset, hence why we have to convert it to a list
using var subjects = baseReachability.Versions.Keys.ToList().GetContinuousEnumerator();
return observers.Aggregate(baseReachability, (r, o) =>
{
return Enumerable.Range(1, 5).Aggregate(r, (r2, i) =>
{
subjects.MoveNext();
return r2.Unreachable(o, subjects.Current);
});
});
}

internal Reachability Reachability1;
internal Reachability Reachability2;
internal Reachability Reachability3;
internal HashSet<UniqueAddress> Allowed;

[GlobalSetup]
public void Setup()
{
Reachability1 = CreateReachabilityOfSize(Reachability.Empty, NodesSize);
Reachability2 = CreateReachabilityOfSize(Reachability1, NodesSize);
Reachability3 = AddUnreachable(Reachability1, NodesSize / 2);
Allowed = Reachability1.Versions.Keys.ToHashSet();
}

private void CheckThunkFor(Reachability r1, Reachability r2, Action<Reachability, Reachability> thunk,
int times)
{
for (var i = 0; i < times; i++)
thunk(new Reachability(r1.Records, r1.Versions), new Reachability(r2.Records, r2.Versions));
}

private void CheckThunkFor(Reachability r1, Action<Reachability> thunk, int times)
{
for (var i = 0; i < times; i++)
thunk(new Reachability(r1.Records, r1.Versions));
}

private void Merge(Reachability r1, Reachability r2, int expectedRecords)
{
r1.Merge(Allowed, r2).Records.Count.Should().Be(expectedRecords);
}

private void CheckStatus(Reachability r1)
{
var record = r1.Records.First();
r1.Status(record.Observer, record.Subject).Should().Be(record.Status);
}

private void CheckAggregatedStatus(Reachability r1)
{
var record = r1.Records.First();
r1.Status(record.Subject).Should().Be(record.Status);
}

private void AllUnreachableOrTerminated(Reachability r1)
{
r1.AllUnreachableOrTerminated.IsEmpty.Should().BeFalse();
}

private void AllUnreachable(Reachability r1)
{
r1.AllUnreachable.IsEmpty.Should().BeFalse();
}

private void RecordsFrom(Reachability r1)
{
foreach (var o in r1.AllObservers)
{
r1.RecordsFrom(o).Should().NotBeNull();
}
}

[Benchmark]
public void Reachability_must_merge_with_same_versions()
{
CheckThunkFor(Reachability1, Reachability1, (r1, r2) => Merge(r1, r2, 0), Iterations);
}

[Benchmark]
public void Reachability_must_merge_with_all_older_versions()
{
CheckThunkFor(Reachability2, Reachability1, (r1, r2) => Merge(r1, r2, 0), Iterations);
}

[Benchmark]
public void Reachability_must_merge_with_all_newer_versions()
{
CheckThunkFor(Reachability1, Reachability2, (r1, r2) => Merge(r1, r2, 0), Iterations);
}

[Benchmark]
public void Reachability_must_merge_with_half_nodes_unreachable()
{
CheckThunkFor(Reachability1, Reachability3, (r1, r2) => Merge(r1, r2, 5* NodesSize/2), Iterations);
}

[Benchmark]
public void Reachability_must_merge_with_half_nodes_unreachable_opposite()
{
CheckThunkFor(Reachability3, Reachability1, (r1, r2) => Merge(r1, r2, 5 * NodesSize / 2), Iterations);
}

[Benchmark]
public void Reachability_must_check_status_with_half_nodes_unreachable()
{
CheckThunkFor(Reachability3, CheckAggregatedStatus, Iterations);
}

[Benchmark]
public void Reachability_must_check_AllUnreachableOrTerminated_with_half_nodes_unreachable()
{
CheckThunkFor(Reachability3, AllUnreachableOrTerminated, Iterations);
}

[Benchmark]
public void Reachability_must_check_AllUnreachable_with_half_nodes_unreachable()
{
CheckThunkFor(Reachability3, AllUnreachable, Iterations);
}

[Benchmark]
public void Reachability_must_check_RecordsFrom_with_half_nodes_unreachable()
{
CheckThunkFor(Reachability3, RecordsFrom, Iterations);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Metrics")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ private FailureDetectorStub Fd(ClusterHeartbeatSenderState state, UniqueAddress
public void ClusterHeartbeatSenderState_must_return_empty_active_set_when_no_nodes()
{
_emptyState
.ActiveReceivers.IsEmpty.Should().BeTrue();
.ActiveReceivers.Count.Should().Be(0);
}

[Fact]
public void ClusterHeartbeatSenderState_must_init_with_empty()
{
_emptyState.Init(ImmutableHashSet<UniqueAddress>.Empty, ImmutableHashSet<UniqueAddress>.Empty)
.ActiveReceivers.IsEmpty.Should().BeTrue();
.ActiveReceivers.Count.Should().Be(0);
}

[Fact]
Expand Down
55 changes: 32 additions & 23 deletions src/core/Akka.Cluster/ClusterHeartbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public ClusterHeartbeatSenderState(HeartbeatNodeRing ring, ImmutableHashSet<Uniq
/// <summary>
/// TBD
/// </summary>
public readonly ImmutableHashSet<UniqueAddress> ActiveReceivers;
public readonly IImmutableSet<UniqueAddress> ActiveReceivers;

/// <summary>
/// TBD
Expand Down Expand Up @@ -569,6 +569,7 @@ public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing ring = null, Immutable
internal sealed class HeartbeatNodeRing
{
private readonly bool _useAllAsReceivers;
private Option<IImmutableSet<UniqueAddress>> _myReceivers;

/// <summary>
/// TBD
Expand All @@ -588,14 +589,15 @@ public HeartbeatNodeRing(
{
SelfAddress = selfAddress;
Nodes = nodes;
NodeRing = nodes.ToImmutableSortedSet(RingComparer.Instance);
Unreachable = unreachable;
MonitoredByNumberOfNodes = monitoredByNumberOfNodes;

if (!nodes.Contains(selfAddress))
throw new ArgumentException($"Nodes [${string.Join(", ", nodes)}] must contain selfAddress [{selfAddress}]");

_useAllAsReceivers = MonitoredByNumberOfNodes >= (NodeRing().Count - 1);
MyReceivers = new Lazy<ImmutableHashSet<UniqueAddress>>(() => Receivers(SelfAddress));
_useAllAsReceivers = MonitoredByNumberOfNodes >= (NodeRing.Count - 1);
_myReceivers = Option<IImmutableSet<UniqueAddress>>.None;
}

/// <summary>
Expand All @@ -618,26 +620,34 @@ public HeartbeatNodeRing(
/// </summary>
public int MonitoredByNumberOfNodes { get; }

private ImmutableSortedSet<UniqueAddress> NodeRing()
{
return Nodes.ToImmutableSortedSet(RingComparer.Instance);
}
public ImmutableSortedSet<UniqueAddress> NodeRing { get; }

/// <summary>
/// Receivers for <see cref="SelfAddress"/>. Cached for subsequent access.
/// </summary>
public readonly Lazy<ImmutableHashSet<UniqueAddress>> MyReceivers;
public Option<IImmutableSet<UniqueAddress>> MyReceivers
{
get
{
if (_myReceivers.IsEmpty)
{
_myReceivers = new Option<IImmutableSet<UniqueAddress>>(Receivers(SelfAddress));
}

return _myReceivers;
}
}

/// <summary>
/// TBD
/// The set of Akka.Cluster nodes designated for receiving heartbeats from this node.
/// </summary>
/// <param name="sender">TBD</param>
/// <returns>TBD</returns>
public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
/// <param name="sender">The node sending heartbeats.</param>
/// <returns>An organized ring of unique nodes.</returns>
public IImmutableSet<UniqueAddress> Receivers(UniqueAddress sender)
{
if (_useAllAsReceivers)
{
return NodeRing().Remove(sender).ToImmutableHashSet();
return NodeRing.Remove(sender);
}
else
{
Expand All @@ -646,8 +656,7 @@ public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
Func<int, IEnumerator<UniqueAddress>, ImmutableSortedSet<UniqueAddress>, (int, ImmutableSortedSet<UniqueAddress>)> take = null;
take = (n, iter, acc) =>
(int, ImmutableSortedSet<UniqueAddress>) Take(int n, IEnumerator<UniqueAddress> iter, ImmutableSortedSet<UniqueAddress> acc)
{
if (iter.MoveNext() == false || n == 0)
{
Expand All @@ -660,26 +669,26 @@ public ImmutableHashSet<UniqueAddress> Receivers(UniqueAddress sender)
var isUnreachable = Unreachable.Contains(next);
if (isUnreachable && acc.Count >= MonitoredByNumberOfNodes)
{
return take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
return Take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
}
else if (isUnreachable)
{
return take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
return Take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
}
else
{
return take(n - 1, iter, acc.Add(next)); // include the reachable
return Take(n - 1, iter, acc.Add(next)); // include the reachable
}
}
};
}

var (remaining, slice1) = take(MonitoredByNumberOfNodes, NodeRing().From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty);
var (remaining, slice1) = Take(MonitoredByNumberOfNodes, NodeRing.From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet<UniqueAddress>.Empty);

IImmutableSet<UniqueAddress> slice = remaining == 0
? slice1 // or, wrap-around
: take(remaining, NodeRing().TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2;
: Take(remaining, NodeRing.TakeWhile(x => x != sender).GetEnumerator(), slice1).Item2;

return slice.ToImmutableHashSet();
return slice;
}
}

Expand All @@ -697,7 +706,7 @@ public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, ImmutableHashSet
selfAddress ?? SelfAddress,
nodes ?? Nodes,
unreachable ?? Unreachable,
monitoredByNumberOfNodes.HasValue ? monitoredByNumberOfNodes.Value : MonitoredByNumberOfNodes);
monitoredByNumberOfNodes ?? MonitoredByNumberOfNodes);
}

#region Operators
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Cluster/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: InternalsVisibleTo("Akka.Cluster.Metrics")]
[assembly: InternalsVisibleTo("Akka.DistributedData")]
[assembly: InternalsVisibleTo("Akka.Benchmarks")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
Expand Down
Loading

0 comments on commit 234188e

Please sign in to comment.