Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new ChannelTaskScheduler Extension #5403

Merged
merged 6 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2509,6 +2509,22 @@ namespace Akka.Dispatch
public System.TimeSpan PushTimeout { get; }
public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { }
}
public sealed class ChannelTaskScheduler : Akka.Actor.IExtension, System.IDisposable
{
public ChannelTaskScheduler(Akka.Actor.ExtendedActorSystem system) { }
public System.Threading.Tasks.TaskScheduler High { get; }
public System.Threading.Tasks.TaskScheduler Idle { get; }
public System.Threading.Tasks.TaskScheduler Low { get; }
public System.Threading.Tasks.TaskScheduler Normal { get; }
public void Dispose() { }
public static Akka.Dispatch.ChannelTaskScheduler Get(Akka.Actor.ActorSystem system) { }
public System.Threading.Tasks.TaskScheduler GetScheduler(Akka.Dispatch.TaskSchedulerPriority priority) { }
}
public sealed class ChannelTaskSchedulerProvider : Akka.Actor.ExtensionIdProvider<Akka.Dispatch.ChannelTaskScheduler>
{
public ChannelTaskSchedulerProvider() { }
public override Akka.Dispatch.ChannelTaskScheduler CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
public sealed class CurrentSynchronizationContextDispatcher : Akka.Dispatch.Dispatcher
{
public CurrentSynchronizationContextDispatcher(Akka.Dispatch.MessageDispatcherConfigurator configurator, string id, int throughput, System.Nullable<long> throughputDeadlineTime, Akka.Dispatch.ExecutorServiceFactory executorServiceFactory, System.TimeSpan shutdownTimeout) { }
Expand Down Expand Up @@ -2677,6 +2693,18 @@ namespace Akka.Dispatch
public RejectedExecutionException(string message = null, System.Exception inner = null) { }
protected RejectedExecutionException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public enum TaskSchedulerPriority
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
{
None = 0,
Idle = 4,
Background = 4,
Low = 5,
BelowNormal = 6,
Normal = 8,
AboveNormal = 10,
High = 13,
Realtime = 24,
}
public class ThreadPoolConfig
{
public ThreadPoolConfig(Akka.Configuration.Config config) { }
Expand Down
23 changes: 12 additions & 11 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -586,20 +586,21 @@ akka {

default-remote-dispatcher {
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
channel-executor.priority = "high"
}

backoff-remote-dispatcher {
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
channel-executor.priority = "low"
}
}

}
}
1 change: 1 addition & 0 deletions src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Reflection.Emit" Version="4.7.0" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == '$(NetStandardLibVersion)'">
Expand Down
10 changes: 10 additions & 0 deletions src/core/Akka/Configuration/Pigeon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ akka {
}
}

channel-scheduler {
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
parallelism-min = 4 #same as for ForkJoinDispatcher
parallelism-factor = 1 #same as for ForkJoinDispatcher
parallelism-max = 64 #same as for ForkJoinDispatcher
work-max = 10 #max executed work items in sequence until priority loop
work-interval = 500 #time target of executed work items in ms
work-step = 2 #target work item count in interval / burst
}

#used for GUI applications
synchronized-dispatcher {
type = "SynchronizedDispatcher"
Expand Down Expand Up @@ -377,6 +386,7 @@ akka {
parallelism-factor = 1.0
parallelism-max = 64
}
channel-executor.priority = "high"
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
}

default-blocking-io-dispatcher {
Expand Down
15 changes: 7 additions & 8 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,18 @@ internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var fje = config.GetConfig("fork-join-executor");
MaxParallelism = ThreadPoolConfig.ScaledPoolSize(
fje.GetInt("parallelism-min"),
fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
fje.GetInt("parallelism-max"));
var cfg = config.GetConfig("channel-executor");
Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), cfg.GetString("priority", "normal"), true);
}

public int MaxParallelism {get;}
public TaskSchedulerPriority Priority { get; }

public override ExecutorService Produce(string id)
{
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]"));
return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[{id}]", typeof(TaskSchedulerExecutor), $"Launched Dispatcher [{id}] with Priority[{Priority}]"));

var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority);
return new TaskSchedulerExecutor(id, scheduler);
}
}

Expand Down
Loading