From b3e85a51865fe6acc935f93bf1b6a00130f939b8 Mon Sep 17 00:00:00 2001 From: zetanova Date: Tue, 30 Nov 2021 17:36:22 +0100 Subject: [PATCH 1/2] add new ChannelTaskScheduler extension and set as the new 'channel-executor' --- .../CoreAPISpec.ApproveCore.approved.txt | 28 ++ .../Akka.Remote/Configuration/Remote.conf | 23 +- src/core/Akka/Akka.csproj | 1 + src/core/Akka/Configuration/Pigeon.conf | 10 + src/core/Akka/Dispatch/AbstractDispatcher.cs | 15 +- .../Dispatch/ChannelSchedulerExtension.cs | 389 ++++++++++++++++++ 6 files changed, 447 insertions(+), 19 deletions(-) create mode 100644 src/core/Akka/Dispatch/ChannelSchedulerExtension.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 254164cb973..7a3e6616bb2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -2509,6 +2509,22 @@ namespace Akka.Dispatch public System.TimeSpan PushTimeout { get; } public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } } + public sealed class ChannelTaskScheduler : Akka.Actor.IExtension, System.IDisposable + { + public ChannelTaskScheduler(Akka.Actor.ExtendedActorSystem system) { } + public System.Threading.Tasks.TaskScheduler High { get; } + public System.Threading.Tasks.TaskScheduler Idle { get; } + public System.Threading.Tasks.TaskScheduler Low { get; } + public System.Threading.Tasks.TaskScheduler Normal { get; } + public void Dispose() { } + public static Akka.Dispatch.ChannelTaskScheduler Get(Akka.Actor.ActorSystem system) { } + public System.Threading.Tasks.TaskScheduler GetScheduler(Akka.Dispatch.TaskSchedulerPriority priority) { } + } + public sealed class ChannelTaskSchedulerProvider : Akka.Actor.ExtensionIdProvider + { + public ChannelTaskSchedulerProvider() { } + public override Akka.Dispatch.ChannelTaskScheduler CreateExtension(Akka.Actor.ExtendedActorSystem system) { } + } public sealed class CurrentSynchronizationContextDispatcher : Akka.Dispatch.Dispatcher { public CurrentSynchronizationContextDispatcher(Akka.Dispatch.MessageDispatcherConfigurator configurator, string id, int throughput, System.Nullable throughputDeadlineTime, Akka.Dispatch.ExecutorServiceFactory executorServiceFactory, System.TimeSpan shutdownTimeout) { } @@ -2677,6 +2693,18 @@ namespace Akka.Dispatch public RejectedExecutionException(string message = null, System.Exception inner = null) { } protected RejectedExecutionException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } + public enum TaskSchedulerPriority + { + None = 0, + Idle = 4, + Background = 4, + Low = 5, + BelowNormal = 6, + Normal = 8, + AboveNormal = 10, + High = 13, + Realtime = 24, + } public class ThreadPoolConfig { public ThreadPoolConfig(Akka.Configuration.Config config) { } diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 62217d16def..9bc288644e3 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -586,20 +586,21 @@ akka { default-remote-dispatcher { executor = fork-join-executor - fork-join-executor { - parallelism-min = 2 - parallelism-factor = 0.5 - parallelism-max = 16 - } + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } + channel-executor.priority = "high" } backoff-remote-dispatcher { executor = fork-join-executor - fork-join-executor { - parallelism-min = 2 - parallelism-max = 2 - } + fork-join-executor { + parallelism-min = 2 + parallelism-max = 2 + } + channel-executor.priority = "low" } } - -} +} \ No newline at end of file diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index 2f0d47d7804..bf79d56b6fe 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -19,6 +19,7 @@ + diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index 16889b8d1bf..1d2f77be2e8 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -269,6 +269,15 @@ akka { } } + channel-scheduler { + parallelism-min = 4 #same as for ForkJoinDispatcher + parallelism-factor = 1 #same as for ForkJoinDispatcher + parallelism-max = 64 #same as for ForkJoinDispatcher + work-max = 10 #max executed work items in sequence until priority loop + work-interval = 500 #time target of executed work items in ms + work-step = 2 #target work item count in interval / burst + } + #used for GUI applications synchronized-dispatcher { type = "SynchronizedDispatcher" @@ -377,6 +386,7 @@ akka { parallelism-factor = 1.0 parallelism-max = 64 } + channel-executor.priority = "high" } default-blocking-io-dispatcher { diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index b18b1f17e05..4d3b7d18041 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -120,19 +120,18 @@ internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator { public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) { - var fje = config.GetConfig("fork-join-executor"); - MaxParallelism = ThreadPoolConfig.ScaledPoolSize( - fje.GetInt("parallelism-min"), - fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to - fje.GetInt("parallelism-max")); + var cfg = config.GetConfig("channel-executor"); + Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), cfg.GetString("priority", "normal"), true); } - public int MaxParallelism {get;} + public TaskSchedulerPriority Priority { get; } public override ExecutorService Produce(string id) { - Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]")); - return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism)); + Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[{id}]", typeof(TaskSchedulerExecutor), $"Launched Dispatcher [{id}] with Priority[{Priority}]")); + + var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority); + return new TaskSchedulerExecutor(id, scheduler); } } diff --git a/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs new file mode 100644 index 00000000000..bdc8877fc69 --- /dev/null +++ b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs @@ -0,0 +1,389 @@ +using Akka.Actor; +using Akka.Configuration; +using Akka.Event; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Akka.Dispatch +{ + public sealed class ChannelTaskSchedulerProvider : ExtensionIdProvider + { + public override ChannelTaskScheduler CreateExtension(ExtendedActorSystem system) + { + return new ChannelTaskScheduler(system); + } + } + + public sealed class ChannelTaskScheduler : IExtension, IDisposable + { + [ThreadStatic] + private static TaskSchedulerPriority _threadPriority = TaskSchedulerPriority.None; + + private readonly Task _controlTask; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + private readonly Timer _timer; + private readonly Task[] _coworkers; + private readonly int _maximumConcurrencyLevel; + private readonly int _maxWork = 3; //max work items to execute at one priority + + private readonly int _workInterval = 500; + private readonly int _workStep = 2; + + private readonly PriorityTaskScheduler _highScheduler; + private readonly PriorityTaskScheduler _normalScheduler; + private readonly PriorityTaskScheduler _lowScheduler; + private readonly PriorityTaskScheduler _idleScheduler; + + public TaskScheduler High => _highScheduler; + public TaskScheduler Normal => _normalScheduler; + public TaskScheduler Low => _lowScheduler; + public TaskScheduler Idle => _idleScheduler; + + public static ChannelTaskScheduler Get(ActorSystem system) + { + return system.WithExtension(typeof(ChannelTaskSchedulerProvider)); + } + + public ChannelTaskScheduler(ExtendedActorSystem system) + { + //todo own channel-task-scheduler config section + var config = system.Settings.Config.GetConfig("akka.channel-scheduler"); + _maximumConcurrencyLevel = ThreadPoolConfig.ScaledPoolSize( + config.GetInt("parallelism-min"), + config.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to + config.GetInt("parallelism-max")); + _maximumConcurrencyLevel = Math.Max(_maximumConcurrencyLevel, 1); + _maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); + + _workInterval = config.GetInt("work-interval", _workInterval); + _workStep = config.GetInt("work-step", _workStep); + + var channelOptions = new UnboundedChannelOptions() + { + AllowSynchronousContinuations = true, + SingleReader = _maximumConcurrencyLevel == 1, + SingleWriter = false + }; + + _highScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.AboveNormal); + _normalScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.Normal); + _lowScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.Low); + _idleScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.Idle); + + _coworkers = new Task[_maximumConcurrencyLevel - 1]; + for (var i = 0; i < _coworkers.Length; i++) + _coworkers[i] = Task.CompletedTask; + + _timer = new Timer(ScheduleCoWorkers, "timer", Timeout.Infinite, Timeout.Infinite); + + _controlTask = Task.Factory.StartNew(ControlAsync, _cts.Token, + TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap(); + } + + public TaskScheduler GetScheduler(TaskSchedulerPriority priority) + { + switch (priority) + { + case TaskSchedulerPriority.Normal: + return _normalScheduler; + case TaskSchedulerPriority.Realtime: + case TaskSchedulerPriority.High: + case TaskSchedulerPriority.AboveNormal: + return _highScheduler; + case TaskSchedulerPriority.BelowNormal: + case TaskSchedulerPriority.Low: + return _lowScheduler; + case TaskSchedulerPriority.Background: + //case TaskSchedulerPriority.Idle: + return _idleScheduler; + default: + throw new ArgumentException(nameof(priority)); + } + } + + private async Task ControlAsync() + { + var highReader = _highScheduler.Channel.Reader; + var normalReader = _normalScheduler.Channel.Reader; + var lowReader = _lowScheduler.Channel.Reader; + var idleReader = _idleScheduler.Channel.Reader; + + var readTasks = new Task[] { + highReader.WaitToReadAsync().AsTask(), + normalReader.WaitToReadAsync().AsTask(), + lowReader.WaitToReadAsync().AsTask(), + idleReader.WaitToReadAsync().AsTask() + }; + + Task readTask; + + do + { + //schedule coworkers + ScheduleCoWorkers("control"); + + //main worker + DoWork(0); + + //wait on coworker exit + await Task.WhenAll(_coworkers).ConfigureAwait(false); + + //stop timer + if (!_cts.IsCancellationRequested) + _timer.Change(Timeout.Infinite, Timeout.Infinite); + + //reset read events + if (readTasks[0].IsCompleted) + readTasks[0] = highReader.WaitToReadAsync().AsTask(); + if (readTasks[1].IsCompleted) + readTasks[1] = normalReader.WaitToReadAsync().AsTask(); + if (readTasks[2].IsCompleted) + readTasks[2] = lowReader.WaitToReadAsync().AsTask(); + if (readTasks[3].IsCompleted) + readTasks[3] = idleReader.WaitToReadAsync().AsTask(); + + readTask = await Task.WhenAny(readTasks).ConfigureAwait(false); + } + while (readTask.Result && !_cts.IsCancellationRequested); + } + + private void ScheduleCoWorkers(object state) + { + var name = (string)state; + + var queuedWorkItems = _highScheduler.Channel.Reader.Count + + _normalScheduler.Channel.Reader.Count + + _lowScheduler.Channel.Reader.Count + + _idleScheduler.Channel.Reader.Count; + + var reqWorkerCount = queuedWorkItems; + + //limit req workers + reqWorkerCount = Math.Min(reqWorkerCount, _maximumConcurrencyLevel); + + //count running workers + var controlWorkerCount = name == "control" ? 1 : 0; + var coworkerCount = 0; + for (int i = 0; i < _coworkers.Length; i++) + { + if (!_coworkers[i].IsCompleted) + coworkerCount++; + } + + //limit new workers + var newWorkerToStart = Math.Min(Math.Max(reqWorkerCount - controlWorkerCount - coworkerCount, 0), _workStep); + if (newWorkerToStart == 0 && reqWorkerCount > controlWorkerCount && (controlWorkerCount+coworkerCount) < _maximumConcurrencyLevel) + newWorkerToStart = 1; + + if (newWorkerToStart > 0) + { + //start new workers + for (var i = 0; newWorkerToStart > 0 && i < _coworkers.Length; i++) + { + if (_coworkers[i].IsCompleted) + { + _coworkers[i] = Task.Factory.StartNew(Worker, i + 1, _cts.Token, + TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + newWorkerToStart--; + } + } + } + + //reschedule + if (!_cts.IsCancellationRequested) + { + var interval = controlWorkerCount > 0 || (reqWorkerCount - newWorkerToStart) > 0 + ? _workInterval / _workStep + : _workInterval * _workStep; + _timer.Change(interval, Timeout.Infinite); + } + } + + private void Worker(object state) + { + DoWork((int)state); + } + + private int DoWork(int workerId) + { + var highCount = 0; + var normalCount = 0; + var lowCount = 0; + var idleCount = 0; + + int c; + int rounds = 0; + int roundWork; + int roundClean = 0; + + //maybe implement max work count and/or a deadline + + _threadPriority = TaskSchedulerPriority.Idle; + try + { + do + { + rounds++; + roundWork = 0; + + c = _highScheduler.ExecuteAll(); + highCount += c; + roundWork += c; + + c = _normalScheduler.ExecuteMany(_maxWork); + normalCount += c; + roundWork += c; + + c = roundWork > 0 + ? _lowScheduler.ExecuteSingle() + : _lowScheduler.ExecuteMany(_maxWork); + lowCount += c; + roundWork += c; + + //if there was no work then only execute background tasks + if (c == 0) + { + c = _idleScheduler.ExecuteSingle(); + idleCount += c; + roundWork += c; + } + + roundClean = roundWork == 0 ? roundClean + 1 : 0; + } + while (roundClean < 2 && !_cts.IsCancellationRequested); + } + catch + { + //ignore error + } + finally + { + _threadPriority = TaskSchedulerPriority.None; + } + + //worker stopped + + var total = highCount + normalCount + lowCount + idleCount; + + //todo push to metrics: workerId, total, highCount, normalCount, lowCount, idleCount + + return total; + } + + public void Dispose() + { + _idleScheduler.Dispose(); + _lowScheduler.Dispose(); + _normalScheduler.Dispose(); + _highScheduler.Dispose(); + + _cts.Cancel(); + _timer.Dispose(); + } + + sealed class PriorityTaskScheduler : TaskScheduler, IDisposable + { + readonly Channel _channel; + + readonly TaskSchedulerPriority _priority; + + public Channel Channel => _channel; + public TaskSchedulerPriority Priority => _priority; + + public PriorityTaskScheduler(Channel channel, TaskSchedulerPriority priority) + { + _channel = channel; + _priority = priority; + } + + protected override void QueueTask(Task task) + { + if (!_channel.Writer.TryWrite(task)) + throw new InvalidOperationException(); + } + + protected override IEnumerable GetScheduledTasks() + { + return Array.Empty(); + } + + protected override bool TryDequeue(Task task) + { + return false; + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task + // and the thread priority is higher, + // we don't support inlining + return (_threadPriority > TaskSchedulerPriority.None && _threadPriority <= _priority) + && TryExecuteTask(task); + } + + public int ExecuteAll() + { + _threadPriority = _priority; + + var reader = _channel.Reader; + var count = 0; + + while (reader.TryRead(out var task)) + { + count++; //maybe only count successfully executed + if (!TryExecuteTask(task)) + return count; + } + return count; + } + + public int ExecuteMany(int maxTasks) + { + _threadPriority = _priority; + + var reader = _channel.Reader; + int c; + + for (c = 0; c < maxTasks && reader.TryRead(out var task); c++) + if (!TryExecuteTask(task)) + return c + 1; + + return c; + } + + public int ExecuteSingle() + { + _threadPriority = _priority; + + if (_channel.Reader.TryRead(out var task)) + { + TryExecuteTask(task); + return 1; + } + return 0; + } + + public void Dispose() + { + _channel.Writer.TryComplete(); + } + } + } + + public enum TaskSchedulerPriority + { + None = 0, + Idle = 4, + Background = 4, + Low = 5, + BelowNormal = 6, + Normal = 8, + AboveNormal = 10, + High = 13, + Realtime = 24 + } +} From 7c1d053af53f64dca34196959ee90cb02ab1ae5b Mon Sep 17 00:00:00 2001 From: zetanova Date: Fri, 3 Dec 2021 14:50:00 +0100 Subject: [PATCH 2/2] rallback dependancy version and add comments --- src/core/Akka/Akka.csproj | 2 +- .../Dispatch/ChannelSchedulerExtension.cs | 149 ++++++++++++++---- 2 files changed, 115 insertions(+), 36 deletions(-) diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index bf79d56b6fe..8889b104757 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -19,7 +19,7 @@ - + diff --git a/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs index bdc8877fc69..8b4867dd6dc 100644 --- a/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs +++ b/src/core/Akka/Dispatch/ChannelSchedulerExtension.cs @@ -1,11 +1,9 @@ -using Akka.Actor; -using Akka.Configuration; -using Akka.Event; -using System; +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using Akka.Actor; namespace Akka.Dispatch { @@ -17,21 +15,33 @@ public override ChannelTaskScheduler CreateExtension(ExtendedActorSystem system) } } + /// + /// The Taskscheduler holds multiple TaskSchdulers with different priorities + /// All scheduled work is executed over the regular ThreadPool + /// and the execution sequence is depenedened on the priority of the TaskSchduler + /// + /// Priority TaskSchedulers: + /// High => All queued work is processed before any other priority + /// Normal => Normal work load, processed until max-work count + /// Low => Only processed after normal work load + /// Idle => only processed when no other work is queued + /// public sealed class ChannelTaskScheduler : IExtension, IDisposable { [ThreadStatic] private static TaskSchedulerPriority _threadPriority = TaskSchedulerPriority.None; - private readonly Task _controlTask; - private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - private readonly Timer _timer; - private readonly Task[] _coworkers; - private readonly int _maximumConcurrencyLevel; - private readonly int _maxWork = 3; //max work items to execute at one priority + private readonly Task _controlTask; //the main worker + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); //main cancellation token + private readonly Timer _timer; //timer to schedule coworkers + private readonly Task[] _coworkers; //the coworkers + private readonly int _maximumConcurrencyLevel; //max count of workers + private readonly int _maxWork = 10; //max executed work items in sequence until priority loop - private readonly int _workInterval = 500; - private readonly int _workStep = 2; + private readonly int _workInterval = 500; //time target of executed work items in ms + private readonly int _workStep = 2; //target work item count in interval / burst + //priority task schedulers private readonly PriorityTaskScheduler _highScheduler; private readonly PriorityTaskScheduler _normalScheduler; private readonly PriorityTaskScheduler _lowScheduler; @@ -49,18 +59,19 @@ public static ChannelTaskScheduler Get(ActorSystem system) public ChannelTaskScheduler(ExtendedActorSystem system) { - //todo own channel-task-scheduler config section + //config channel-scheduler var config = system.Settings.Config.GetConfig("akka.channel-scheduler"); _maximumConcurrencyLevel = ThreadPoolConfig.ScaledPoolSize( config.GetInt("parallelism-min"), config.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to config.GetInt("parallelism-max")); _maximumConcurrencyLevel = Math.Max(_maximumConcurrencyLevel, 1); - _maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); + _maxWork = Math.Max(config.GetInt("work-max", _maxWork), 3); //min 3 normal work in work-loop _workInterval = config.GetInt("work-interval", _workInterval); _workStep = config.GetInt("work-step", _workStep); + //create task schedulers var channelOptions = new UnboundedChannelOptions() { AllowSynchronousContinuations = true, @@ -73,17 +84,26 @@ public ChannelTaskScheduler(ExtendedActorSystem system) _lowScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.Low); _idleScheduler = new PriorityTaskScheduler(Channel.CreateUnbounded(channelOptions), TaskSchedulerPriority.Idle); + //prefill coworker array _coworkers = new Task[_maximumConcurrencyLevel - 1]; for (var i = 0; i < _coworkers.Length; i++) _coworkers[i] = Task.CompletedTask; + //init paused timer _timer = new Timer(ScheduleCoWorkers, "timer", Timeout.Infinite, Timeout.Infinite); + //start main worker _controlTask = Task.Factory.StartNew(ControlAsync, _cts.Token, TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); } + /// + /// Get highest possible TaskSchdeduler of requested priority + /// + /// requested priority + /// TaskSchdeduler with highest possible priority + /// priority not supported public TaskScheduler GetScheduler(TaskSchedulerPriority priority) { switch (priority) @@ -98,13 +118,17 @@ public TaskScheduler GetScheduler(TaskSchedulerPriority priority) case TaskSchedulerPriority.Low: return _lowScheduler; case TaskSchedulerPriority.Background: - //case TaskSchedulerPriority.Idle: + //case TaskSchedulerPriority.Idle: return _idleScheduler; default: throw new ArgumentException(nameof(priority)); } } + /// + /// The main worker waits for work and schedule coworkers + /// + /// main worker private async Task ControlAsync() { var highReader = _highScheduler.Channel.Reader; @@ -129,7 +153,7 @@ private async Task ControlAsync() //main worker DoWork(0); - //wait on coworker exit + //wait on all coworkers exit await Task.WhenAll(_coworkers).ConfigureAwait(false); //stop timer @@ -146,43 +170,52 @@ private async Task ControlAsync() if (readTasks[3].IsCompleted) readTasks[3] = idleReader.WaitToReadAsync().AsTask(); + //wait on new work) readTask = await Task.WhenAny(readTasks).ConfigureAwait(false); } while (readTask.Result && !_cts.IsCancellationRequested); } + /// + /// Scheduling new coworkers based on queued work count + /// and starting the timer to reschedule coworkers periodically + /// + /// indication if the call is from main worker or the timer private void ScheduleCoWorkers(object state) { - var name = (string)state; + var name = (string)state; //control or timer + int i; + //total count of queued work items var queuedWorkItems = _highScheduler.Channel.Reader.Count + _normalScheduler.Channel.Reader.Count + _lowScheduler.Channel.Reader.Count + _idleScheduler.Channel.Reader.Count; + //required worker count var reqWorkerCount = queuedWorkItems; - //limit req workers + //limit required workers to max reqWorkerCount = Math.Min(reqWorkerCount, _maximumConcurrencyLevel); //count running workers var controlWorkerCount = name == "control" ? 1 : 0; var coworkerCount = 0; - for (int i = 0; i < _coworkers.Length; i++) + for (i = 0; i < _coworkers.Length; i++) { if (!_coworkers[i].IsCompleted) coworkerCount++; } - //limit new workers + //limit new worker count var newWorkerToStart = Math.Min(Math.Max(reqWorkerCount - controlWorkerCount - coworkerCount, 0), _workStep); - if (newWorkerToStart == 0 && reqWorkerCount > controlWorkerCount && (controlWorkerCount+coworkerCount) < _maximumConcurrencyLevel) + if (newWorkerToStart == 0 && reqWorkerCount > controlWorkerCount && (controlWorkerCount + coworkerCount) < _maximumConcurrencyLevel) newWorkerToStart = 1; if (newWorkerToStart > 0) { //start new workers - for (var i = 0; newWorkerToStart > 0 && i < _coworkers.Length; i++) + for (i = 0; newWorkerToStart > 0 && i < _coworkers.Length; i++) { if (_coworkers[i].IsCompleted) { @@ -193,9 +226,17 @@ private void ScheduleCoWorkers(object state) } } - //reschedule + //reschedule on timer if (!_cts.IsCancellationRequested) { + /* + calculate interval slot + 1) if work-interval is 500ms and work-step is 2 then try to schedule new worker after 250ms + this allows a single worker (main worker) to process everything in its work-slot + without the need of any coworkers. this is in the field the main case + 2) if the call is from the timer and all workers could be scheduled then + assume that all workers are busy for the set work-slot and reschedule much later + */ var interval = controlWorkerCount > 0 || (reqWorkerCount - newWorkerToStart) > 0 ? _workInterval / _workStep : _workInterval * _workStep; @@ -203,48 +244,64 @@ private void ScheduleCoWorkers(object state) } } + /// + /// Task action to run worker + /// + /// workerId private void Worker(object state) { DoWork((int)state); } + /// + /// work loop + /// + /// the worker id + /// private int DoWork(int workerId) { - var highCount = 0; - var normalCount = 0; - var lowCount = 0; - var idleCount = 0; + var highCount = 0; //executed work items in priority high + var normalCount = 0; //executed work items in priority normal + var lowCount = 0; //executed work items in priority low + var idleCount = 0; //executed work items in priority idle int c; - int rounds = 0; - int roundWork; - int roundClean = 0; + int rounds = 0; //loop count + int roundWork; //work count in the current loop + int roundClean = 0; //clean loop count //maybe implement max work count and/or a deadline + + //the work loop _threadPriority = TaskSchedulerPriority.Idle; try - { + { do { rounds++; roundWork = 0; + //execute all high priority work c = _highScheduler.ExecuteAll(); highCount += c; roundWork += c; + //execute normal priority work until max work count c = _normalScheduler.ExecuteMany(_maxWork); normalCount += c; roundWork += c; + //only when all other priority queues where empty + //then execute multiple low priority work c = roundWork > 0 ? _lowScheduler.ExecuteSingle() : _lowScheduler.ExecuteMany(_maxWork); lowCount += c; roundWork += c; - //if there was no work then only execute background tasks + //only execute background tasks + //when the current loop executed no work items if (c == 0) { c = _idleScheduler.ExecuteSingle(); @@ -252,6 +309,7 @@ private int DoWork(int workerId) roundWork += c; } + //count clean loops roundClean = roundWork == 0 ? roundClean + 1 : 0; } while (roundClean < 2 && !_cts.IsCancellationRequested); @@ -270,6 +328,7 @@ private int DoWork(int workerId) var total = highCount + normalCount + lowCount + idleCount; //todo push to metrics: workerId, total, highCount, normalCount, lowCount, idleCount + //maybe add StopWatch return total; } @@ -285,6 +344,11 @@ public void Dispose() _timer.Dispose(); } + /// + /// TaskScheduler to queue work items to the priority-channel from user space + /// and to help execute queued works internaly. + /// It supports task-inlining only for task equal or above the own priority + /// sealed class PriorityTaskScheduler : TaskScheduler, IDisposable { readonly Channel _channel; @@ -319,12 +383,15 @@ protected override bool TryDequeue(Task task) protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task - // and the thread priority is higher, - // we don't support inlining - return (_threadPriority > TaskSchedulerPriority.None && _threadPriority <= _priority) + // and the thread priority is higher, then we don't support inlining + return (_threadPriority > TaskSchedulerPriority.None && _threadPriority <= _priority) && TryExecuteTask(task); } + /// + /// execute work until unsuccessfull + /// + /// dequeued work count public int ExecuteAll() { _threadPriority = _priority; @@ -341,6 +408,11 @@ public int ExecuteAll() return count; } + /// + /// execute work until unsuccessfull or max count + /// + /// max work to execute + /// dequeued work count public int ExecuteMany(int maxTasks) { _threadPriority = _priority; @@ -355,6 +427,10 @@ public int ExecuteMany(int maxTasks) return c; } + /// + /// execute single work item + /// + /// dequeued work count public int ExecuteSingle() { _threadPriority = _priority; @@ -374,6 +450,9 @@ public void Dispose() } } + /// + /// Windows API related Process and Thread Priorities + /// public enum TaskSchedulerPriority { None = 0,