diff --git a/docs/articles/actors/dispatchers.md b/docs/articles/actors/dispatchers.md index 73ee10f96b9..47bf76e5b8e 100644 --- a/docs/articles/actors/dispatchers.md +++ b/docs/articles/actors/dispatchers.md @@ -74,6 +74,7 @@ Some dispatcher configurations are available out-of-the-box for convenience. You * **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher). * **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher). * **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher). +* **channel-executor** - new as of v1.4.19, the [`ChannelExecutor`](#channelexecutor) is used to run on top of the .NET `ThreadPool` and allow Akka.NET to dynamically scale thread usage up and down with demand in exchange for better CPU and throughput performance. ## Built-in Dispatchers @@ -165,6 +166,63 @@ private void Form1_Load(object sender, System.EventArgs e) } ``` +### `ChannelExecutor` +In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecutor` - a new dispatcher type that re-uses the same configuration as a `ForkJoinDispatcher` but runs entirely on top of the .NET `ThreadPool` and is able to take advantage of dynamic thread pool scaling to size / resize workloads on the fly. + +During its initial development and benchmarks, we observed the following: + +1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited. +2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows. + +> [!NOTE] +> We are in the process of gathering data from users on how well `ChannelExecutor` performs in the real world. If you are interested in trying out the `ChannelExecutor`, please read the directions in this document and then comment on [the "Akka.NET v1.4.19: ChannelExecutor performance data" discussion thread](https://github.com/akkadotnet/akka.net/discussions/4983). + +The `ChannelExectuor` re-uses the same threading settings as the `ForkJoinExecutor` to determine its effective upper and lower parallelism limits, and you can configure the `ChannelExecutor` to run inside your `ActorSystem` via the following HOCON configuration: + +``` +akka.actor.default-dispatcher = { + executor = channel-executor + fork-join-executor { #channelexecutor will re-use these settings + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } +} + +akka.actor.internal-dispatcher = { + executor = channel-executor + throughput = 5 + fork-join-executor { + parallelism-min = 4 + parallelism-factor = 1.0 + parallelism-max = 64 + } +} + +akka.remote.default-remote-dispatcher { + type = Dispatcher + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } +} + +akka.remote.backoff-remote-dispatcher { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-max = 2 + } +} +``` + +This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`. + +> [!IMPORTANT] +> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`. + #### Common Dispatcher Configuration The following configuration keys are available for any dispatcher configuration: diff --git a/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs index 96c8c1ffc0e..3b6ca37a3b8 100644 --- a/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Benchmarks.Configurations; +using Akka.Configuration; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Engines; diff --git a/src/benchmark/RemotePingPong/Program.cs b/src/benchmark/RemotePingPong/Program.cs index f2ff92aac82..969e48315e5 100644 --- a/src/benchmark/RemotePingPong/Program.cs +++ b/src/benchmark/RemotePingPong/Program.cs @@ -44,7 +44,7 @@ public static uint CpuSpeed() public static Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port) { var baseConfig = ConfigurationFactory.ParseString(@" - akka { + akka { actor.provider = remote loglevel = ERROR suppress-json-serializer-warning = on @@ -57,6 +57,7 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr port = 0 hostname = ""localhost"" } + } }"); diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index 04c3f332fe7..444dfbf8987 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -74,6 +74,7 @@ public StressSpecConfig() convergence-within-factor = 1.0 } akka.actor.provider = cluster + akka.cluster { failure-detector.acceptable-heartbeat-pause = 3s downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster"" @@ -86,10 +87,29 @@ public StressSpecConfig() akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""] akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off - akka.actor.default-dispatcher.fork-join-executor { - parallelism - min = 8 - parallelism - max = 8 + akka.actor.default-dispatcher = { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } } + akka.actor.internal-dispatcher = { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } + } +akka.remote.default-remote-dispatcher { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } "); TestTransport = true; diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 364108afb2c..246a6f1ddca 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -578,23 +578,20 @@ akka { ### Default dispatcher for the remoting subsystem - ### Default dispatcher for the remoting subsystem - default-remote-dispatcher { - type = ForkJoinDispatcher - executor = fork-join-executor - dedicated-thread-pool { - # Fixed number of threads to have in this threadpool - thread-count = 4 + executor = fork-join-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 } } backoff-remote-dispatcher { - type = ForkJoinDispatcher - executor = fork-join-executor - dedicated-thread-pool { - # Fixed number of threads to have in this threadpool - thread-count = 4 + executor = fork-join-executor + fork-join-executor { + parallelism-min = 2 + parallelism-max = 2 } } } diff --git a/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs b/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs index 4401921e266..8771a3129d5 100644 --- a/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs +++ b/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs @@ -45,4 +45,17 @@ protected override MessageDispatcherConfigurator Configurator() return new DispatcherConfigurator(DispatcherConfiguration, Prereqs); } } + + public class ChannelDispatcherExecutorThroughputSpec : WarmDispatcherThroughputSpecBase + { + public static Config DispatcherConfiguration => ConfigurationFactory.ParseString(@" + id = PerfTest + executor = channel-executor + "); + + protected override MessageDispatcherConfigurator Configurator() + { + return new DispatcherConfigurator(DispatcherConfiguration, Prereqs); + } + } } diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index 481aafbd06b..4b7819d9055 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -278,6 +278,10 @@ public static ActorSystem Create(string name) private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup) { + // allows the ThreadPool to scale up / down dynamically + // by removing minimum thread count, which in our benchmarks + // appears to negatively impact performance + ThreadPool.SetMinThreads(0, 0); var system = new ActorSystemImpl(name, withFallback, setup, Option.None); system.Start(); return system; diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index d5af4e1eea7..28b2abd163d 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index defa523f64a..b18b1f17e05 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -116,6 +116,26 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr public IDispatcherPrerequisites Prerequisites { get; private set; } } + internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator + { + public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) + { + var fje = config.GetConfig("fork-join-executor"); + MaxParallelism = ThreadPoolConfig.ScaledPoolSize( + fje.GetInt("parallelism-min"), + fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to + fje.GetInt("parallelism-max")); + } + + public int MaxParallelism {get;} + + public override ExecutorService Produce(string id) + { + Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]")); + return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism)); + } + } + /// /// INTERNAL API /// @@ -306,6 +326,8 @@ protected ExecutorServiceConfigurator ConfigureExecutor() return new CurrentSynchronizationContextExecutorServiceFactory(Config, Prerequisites); case "task-executor": return new DefaultTaskSchedulerExecutorConfigurator(Config, Prerequisites); + case "channel-executor": + return new ChannelExecutorConfigurator(Config, Prerequisites); default: Type executorConfiguratorType = Type.GetType(executor); if (executorConfiguratorType == null) diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 5043cf40547..2f0231319d1 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -91,6 +92,86 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id) } } + /// + /// INTERNAL API + /// + /// Used to power + /// + internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler + { + + [ThreadStatic] + private static bool _threadRunning = false; + private ConcurrentQueue _tasks = new ConcurrentQueue(); + + private int _readers = 0; + + public FixedConcurrencyTaskScheduler(int degreeOfParallelism) + { + MaximumConcurrencyLevel = degreeOfParallelism; + } + + + public override int MaximumConcurrencyLevel { get; } + + /// + /// ONLY USED IN DEBUGGER - NO PERF IMPACT. + /// + protected override IEnumerable GetScheduledTasks() + { + return _tasks; + } + + protected override bool TryDequeue(Task task) + { + return false; + } + + protected override void QueueTask(Task task) + { + _tasks.Enqueue(task); + if (_readers < MaximumConcurrencyLevel) + { + var initial = _readers; + var newVale = _readers + 1; + if (initial == Interlocked.CompareExchange(ref _readers, newVale, initial)) + { + // try to start a new worker + ThreadPool.UnsafeQueueUserWorkItem(_ => ReadChannel(), null); + } + } + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_threadRunning) return false; + return TryExecuteTask(task); + } + + public void ReadChannel() + { + _threadRunning = true; + try + { + while (_tasks.TryDequeue(out var runnable)) + { + base.TryExecuteTask(runnable); + } + } + catch + { + // suppress exceptions + } + finally + { + Interlocked.Decrement(ref _readers); + + _threadRunning = false; + } + } + } + /// /// INTERNAL API @@ -273,7 +354,7 @@ public MessageDispatcher DefaultGlobalDispatcher internal MessageDispatcher InternalDispatcher { get; } /// - /// The for the default dispatcher. + /// The for the default dispatcher. /// public Config DefaultDispatcherConfig { @@ -336,7 +417,7 @@ public bool HasDispatcher(string id) private MessageDispatcherConfigurator LookupConfigurator(string id) { var depth = 0; - while(depth < MaxDispatcherAliasDepth) + while (depth < MaxDispatcherAliasDepth) { if (_dispatcherConfigurators.TryGetValue(id, out var configurator)) return configurator; @@ -374,7 +455,7 @@ private MessageDispatcherConfigurator LookupConfigurator(string id) /// /// INTERNAL API /// - /// Creates a dispatcher from a . Internal test purpose only. + /// Creates a dispatcher from a . Internal test purpose only. /// /// From(Config.GetConfig(id)); ///