Skip to content

Commit

Permalink
Akka.Cluster.Sharding performance benchmarks (#5209)
Browse files Browse the repository at this point in the history
* introduced new project for Akka.Cluster.Benchmarks

* added first set of sharding performance specs

* fixed local benchmark

* completed entity routing benchmarks

* added spawn entities benchmark

* added SpawnEntities benchmark

* fixed spawn benchmarks
  • Loading branch information
Aaronontheweb authored Aug 18, 2021
1 parent 8093f41 commit bc004ae
Show file tree
Hide file tree
Showing 8 changed files with 478 additions and 3 deletions.
15 changes: 15 additions & 0 deletions src/Akka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SerializationBenchmarks", "
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DDataStressTest", "examples\Cluster\DData\DDataStressTest\DDataStressTest.csproj", "{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Cluster.Benchmarks", "benchmark\Akka.Cluster.Benchmarks\Akka.Cluster.Benchmarks.csproj", "{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1137,6 +1139,18 @@ Global
{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x64.Build.0 = Release|Any CPU
{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.ActiveCfg = Release|Any CPU
{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50}.Release|x86.Build.0 = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|x64.ActiveCfg = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|x64.Build.0 = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|x86.ActiveCfg = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Debug|x86.Build.0 = Debug|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|Any CPU.Build.0 = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|x64.ActiveCfg = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|x64.Build.0 = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|x86.ActiveCfg = Release|Any CPU
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1245,6 +1259,7 @@ Global
{D62F4AD6-318F-4ECC-B875-83FA9933A81B} = {162F5991-EA57-4221-9B70-F9B6FEC18036}
{2E4B9584-42CC-4D17-B719-9F462B16C94D} = {73108242-625A-4D7B-AA09-63375DBAE464}
{44B3DDD6-6103-4E8F-8AC2-0F4BA3CF6B50} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7}
{3CEBB0AE-6A88-4C32-A1D3-A8FB1E7E236B} = {73108242-625A-4D7B-AA09-63375DBAE464}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164}
Expand Down
4 changes: 1 addition & 3 deletions src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.1" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="BenchmarkDotNet" Version="$(BenchmarkDotNetVersion)" />
<!-- FluentAssertions is used in some benchmarks to validate internal behaviors -->
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props" />
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>$(NetCoreTestVersion)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="$(BenchmarkDotNetVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\contrib\cluster\Akka.Cluster.Sharding\Akka.Cluster.Sharding.csproj" />
<ProjectReference Include="..\..\contrib\persistence\Akka.Persistence.Sqlite\Akka.Persistence.Sqlite.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\Akka.Benchmarks\Configurations\Configs.cs">
<Link>Configs.cs</Link>
</Compile>
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions src/benchmark/Akka.Cluster.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Reflection;
using BenchmarkDotNet.Running;

namespace Akka.Cluster.Benchmarks
{
class Program
{
static void Main(string[] args)
{
BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//-----------------------------------------------------------------------
// <copyright file="ShardMessageRoutingBenchmarks.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster.Sharding;
using BenchmarkDotNet.Attributes;
using static Akka.Cluster.Benchmarks.Sharding.ShardingHelper;

namespace Akka.Cluster.Benchmarks.Sharding
{
[Config(typeof(MonitoringConfig))]
public class ShardMessageRoutingBenchmarks
{
[Params(StateStoreMode.Persistence, StateStoreMode.DData)]
public StateStoreMode StateMode;

[Params(10000)]
public int MsgCount;

public int BatchSize = 20;

private ActorSystem _sys1;
private ActorSystem _sys2;

private IActorRef _shardRegion1;
private IActorRef _shardRegion2;

private string _entityOnSys1;
private string _entityOnSys2;

private ShardedMessage _messageToSys1;
private ShardedMessage _messageToSys2;

private IActorRef _batchActor;
private Task _batchComplete;

[GlobalSetup]
public async Task Setup()
{
var config = StateMode switch
{
StateStoreMode.Persistence => CreatePersistenceConfig(),
StateStoreMode.DData => CreateDDataConfig(),
_ => null
};

_sys1 = ActorSystem.Create("BenchSys", config);
_sys2 = ActorSystem.Create("BenchSys", config);

var c1 = Cluster.Get(_sys1);
var c2 = Cluster.Get(_sys2);

await c1.JoinAsync(c1.SelfAddress);
await c2.JoinAsync(c1.SelfAddress);

_shardRegion1 = StartShardRegion(_sys1);
_shardRegion2 = StartShardRegion(_sys2);

var s1Asks = new List<Task<ShardedEntityActor.ResolveResp>>(20);
var s2Asks = new List<Task<ShardedEntityActor.ResolveResp>>(20);

foreach (var i in Enumerable.Range(0, 20))
{
s1Asks.Add(_shardRegion1.Ask<ShardedEntityActor.ResolveResp>(new ShardingEnvelope(i.ToString(), ShardedEntityActor.Resolve.Instance), TimeSpan.FromSeconds(3)));
s2Asks.Add(_shardRegion2.Ask<ShardedEntityActor.ResolveResp>(new ShardingEnvelope(i.ToString(), ShardedEntityActor.Resolve.Instance), TimeSpan.FromSeconds(3)));
}

// wait for all Ask operations to complete
await Task.WhenAll(s1Asks.Concat(s2Asks));

_entityOnSys2 = s1Asks.First(x => x.Result.Addr.Equals(c2.SelfAddress)).Result.EntityId;
_entityOnSys1 = s2Asks.First(x => x.Result.Addr.Equals(c1.SelfAddress)).Result.EntityId;

_messageToSys1 = new ShardedMessage(_entityOnSys1, 10);
_messageToSys2 = new ShardedMessage(_entityOnSys2, 10);
}

[IterationSetup]
public void PerIteration()
{
var tcs = new TaskCompletionSource<bool>();
_batchComplete = tcs.Task;
_batchActor = _sys1.ActorOf(Props.Create(() => new BulkSendActor(tcs, MsgCount)));
}

[Benchmark]
public async Task SingleRequestResponseToLocalEntity()
{
for (var i = 0; i < MsgCount; i++)
await _shardRegion1.Ask<ShardedMessage>(_messageToSys1);
}

[Benchmark]
public async Task StreamingToLocalEntity()
{
_batchActor.Tell(new BulkSendActor.BeginSend(_messageToSys1, _shardRegion1, BatchSize));
await _batchComplete;
}

[Benchmark]
public async Task SingleRequestResponseToRemoteEntity()
{
for (var i = 0; i < MsgCount; i++)
await _shardRegion1.Ask<ShardedMessage>(_messageToSys2);
}

[Benchmark]
public async Task StreamingToRemoteEntity()
{
_batchActor.Tell(new BulkSendActor.BeginSend(_messageToSys2, _shardRegion1, BatchSize));
await _batchComplete;
}

[IterationCleanup]
public void IterationCleanup()
{
_sys1.Stop(_batchActor);
}

[GlobalCleanup]
public async Task Cleanup()
{
var t1 = _sys1.Terminate();
var t2 = _sys2.Terminate();
await Task.WhenAll(t1, t2);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// //-----------------------------------------------------------------------
// // <copyright file="ShardSpawnBenchmarks.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster.Sharding;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;
using static Akka.Cluster.Benchmarks.Sharding.ShardingHelper;

namespace Akka.Cluster.Benchmarks.Sharding
{
[Config(typeof(MonitoringConfig))]
[SimpleJob(RunStrategy.ColdStart, targetCount:1, warmupCount:0, launchCount:5)]
public class ShardSpawnBenchmarks
{
[Params(StateStoreMode.Persistence, StateStoreMode.DData)]
public StateStoreMode StateMode;

[Params(1000)]
public int EntityCount;

[Params(true, false)]
public bool RememberEntities;

public int BatchSize = 20;

private ActorSystem _sys1;
private ActorSystem _sys2;

private IActorRef _shardRegion1;
private IActorRef _shardRegion2;

public static int _shardRegionId = 0;


[GlobalSetup]
public async Task Setup()
{
var config = StateMode switch
{
StateStoreMode.Persistence => CreatePersistenceConfig(RememberEntities),
StateStoreMode.DData => CreateDDataConfig(RememberEntities),
_ => null
};

_sys1 = ActorSystem.Create("BenchSys", config);
_sys2 = ActorSystem.Create("BenchSys", config);

var c1 = Cluster.Get(_sys1);
var c2 = Cluster.Get(_sys2);

await c1.JoinAsync(c1.SelfAddress);
await c2.JoinAsync(c1.SelfAddress);
}

[IterationSetup]
public void IterationSetup()
{
/*
* Create a new set of shard regions each time, so all of the shards are freshly allocated
* on each benchmark run.
*/
_shardRegion1 = StartShardRegion(_sys1, "entities" + _shardRegionId);
_shardRegion2 = StartShardRegion(_sys2, "entities" + _shardRegionId);
_shardRegionId++;
}

[Benchmark]
public async Task SpawnEntities()
{
for (var i = 0; i < EntityCount; i++)
{
var msg = new ShardedMessage(i.ToString(), i);
await _shardRegion1.Ask<ShardedMessage>(msg);
}
}

[GlobalCleanup]
public async Task Cleanup()
{
var t2 = CoordinatedShutdown.Get(_sys2).Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);
var t1 = CoordinatedShutdown.Get(_sys1).Run(CoordinatedShutdown.ActorSystemTerminateReason.Instance);

await Task.WhenAll(t1, t2);
}
}
}
Loading

0 comments on commit bc004ae

Please sign in to comment.