From 40c3dd2a013596e15058b881611f1e9520396fc9 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Thu, 26 May 2022 21:23:39 +0200 Subject: [PATCH] PoC spawner callbacks (#1616) * Add spawn callback * Remove OnInit --- benchmarks/PropsBenchmark/Program.cs | 10 ++++----- src/Proto.Actor/Context/ActorContext.cs | 4 ++-- .../Context/ActorContextDecorator.cs | 4 ++-- .../Context/ActorLoggingContext.cs | 4 ++-- src/Proto.Actor/Context/ISpawnerContext.cs | 19 ++++++++++++++++- src/Proto.Actor/Context/RootContext.cs | 5 +++-- .../Context/RootContextDecorator.cs | 3 ++- src/Proto.Actor/Delegates.cs | 3 ++- .../Mailbox/UnboundedMailboxQueue.cs | 1 + src/Proto.Actor/Props/Props.cs | 21 +++---------------- .../Router/Routers/RouterConfig.cs | 4 +++- src/Proto.Cluster/ClusterExtension.cs | 3 --- .../Identity/IdentityStoragePlacementActor.cs | 4 ++-- .../Partition/PartitionPlacementActor.cs | 18 +++++++--------- .../PartitionActivatorActor.cs | 3 +-- src/Proto.TestKit/TestKitBase.cs | 2 +- tests/Proto.Actor.Tests/PropsTests.cs | 2 +- tests/Proto.Actor.Tests/SpawnTests.cs | 2 +- 18 files changed, 56 insertions(+), 56 deletions(-) diff --git a/benchmarks/PropsBenchmark/Program.cs b/benchmarks/PropsBenchmark/Program.cs index 8f564f1050..23854dab11 100644 --- a/benchmarks/PropsBenchmark/Program.cs +++ b/benchmarks/PropsBenchmark/Program.cs @@ -10,12 +10,10 @@ Console.WriteLine("Starting"); for (var i = 0; i < 1_000_000; i++) { - var p = props.WithOnInit(ctx => { - ctx.Set(new SomeState("SomeId" + i)); - } - ); - - var pid = system.Root.Spawn(p); + var i1 = i; + var pid = system.Root.SpawnNamed(props,"x" + i, ctx => { + ctx.Set(new SomeState("SomeId" + i1)); + }); } Console.WriteLine("Done"); diff --git a/src/Proto.Actor/Context/ActorContext.cs b/src/Proto.Actor/Context/ActorContext.cs index 58c988672b..254d941546 100644 --- a/src/Proto.Actor/Context/ActorContext.cs +++ b/src/Proto.Actor/Context/ActorContext.cs @@ -88,14 +88,14 @@ public void Respond(object message) Logger.LogWarning("{Self} Tried to respond but sender is null, with message {Message}", Self, message); } - public PID SpawnNamed(Props props, string name) + public PID SpawnNamed(Props props, string name, Action? callback = null) { if (props.GuardianStrategy is not null) throw new ArgumentException("Props used to spawn child cannot have GuardianStrategy."); try { - var pid = props.Spawn(System, $"{Self.Id}/{name}", Self); + var pid = props.Spawn(System, $"{Self.Id}/{name}", Self, callback); EnsureExtras().AddChild(pid); return pid; diff --git a/src/Proto.Actor/Context/ActorContextDecorator.cs b/src/Proto.Actor/Context/ActorContextDecorator.cs index 8214481897..8931fed0c4 100644 --- a/src/Proto.Actor/Context/ActorContextDecorator.cs +++ b/src/Proto.Actor/Context/ActorContextDecorator.cs @@ -48,8 +48,8 @@ public virtual void Respond(object message) => public virtual void Stash() => _context.Stash(); - public virtual PID SpawnNamed(Props props, string name) => - _context.SpawnNamed(props, name); + public virtual PID SpawnNamed(Props props, string name, Action? callback=null) => + _context.SpawnNamed(props, name, callback); public virtual void Watch(PID pid) => _context.Watch(pid); diff --git a/src/Proto.Actor/Context/ActorLoggingContext.cs b/src/Proto.Actor/Context/ActorLoggingContext.cs index 32edbe275a..7b9f8e9731 100644 --- a/src/Proto.Actor/Context/ActorLoggingContext.cs +++ b/src/Proto.Actor/Context/ActorLoggingContext.cs @@ -155,11 +155,11 @@ private LogLevel GetLogLevel(object message) return logLevel; } - public override PID SpawnNamed(Props props, string name) + public override PID SpawnNamed(Props props, string name, Action? callback = null) { try { - var pid = base.SpawnNamed(props, name); + var pid = base.SpawnNamed(props, name, callback); if (_logLevel != LogLevel.None && _logger.IsEnabled(_logLevel)) { diff --git a/src/Proto.Actor/Context/ISpawnerContext.cs b/src/Proto.Actor/Context/ISpawnerContext.cs index ccb48c8cd3..fa35c55743 100644 --- a/src/Proto.Actor/Context/ISpawnerContext.cs +++ b/src/Proto.Actor/Context/ISpawnerContext.cs @@ -5,6 +5,8 @@ // ----------------------------------------------------------------------- // ReSharper disable once CheckNamespace +using System; + namespace Proto; public interface ISpawnerContext : ISystemContext @@ -14,8 +16,9 @@ public interface ISpawnerContext : ISystemContext /// /// The Props used to spawn the actor /// The actor name + /// /// The PID of the child actor - PID SpawnNamed(Props props, string name); + PID SpawnNamed(Props props, string name, Action? callback=null); } public static class SpawnerContextExtensions @@ -34,6 +37,7 @@ public static PID Spawn(this ISpawnerContext self, Props props) /// /// Spawns a new child actor based on props and named using a prefix followed by a unique ID. /// + /// /// The Props used to spawn the actor /// The prefix for the actor name public static PID SpawnPrefix(this ISpawnerContext self,Props props, string prefix) @@ -41,4 +45,17 @@ public static PID SpawnPrefix(this ISpawnerContext self,Props props, string pref var name = prefix + self.System.ProcessRegistry.NextId(); return self.SpawnNamed(props, name); } + + /// + /// Spawns a new child actor based on props and named using a prefix followed by a unique ID. + /// + /// + /// The Props used to spawn the actor + /// The prefix for the actor name + /// + public static PID SpawnPrefix(this ISpawnerContext self,Props props, string prefix, Action callback) + { + var name = prefix + self.System.ProcessRegistry.NextId(); + return self.SpawnNamed(props, name, callback); + } } \ No newline at end of file diff --git a/src/Proto.Actor/Context/RootContext.cs b/src/Proto.Actor/Context/RootContext.cs index 267e9f4c6f..4247d97076 100644 --- a/src/Proto.Actor/Context/RootContext.cs +++ b/src/Proto.Actor/Context/RootContext.cs @@ -40,6 +40,7 @@ public RootContext(ActorSystem system, MessageHeader? messageHeader, params Func private Sender? SenderMiddleware { get; init; } public ActorSystem System { get; } + private TypeDictionary Store { get; } = new(0, 1); public T? Get() => (T?) Store.Get(); @@ -55,14 +56,14 @@ public RootContext(ActorSystem system, MessageHeader? messageHeader, params Func PID? IInfoContext.Sender => null; public IActor? Actor => null; - public PID SpawnNamed(Props props, string name) + public PID SpawnNamed(Props props, string name, Action? callback=null) { try { var parent = props.GuardianStrategy is not null ? System.Guardians.GetGuardianPid(props.GuardianStrategy) : null; - return props.Spawn(System, name, parent); + return props.Spawn(System, name, parent, callback); } catch (Exception x) { diff --git a/src/Proto.Actor/Context/RootContextDecorator.cs b/src/Proto.Actor/Context/RootContextDecorator.cs index ef75bd934b..2a46b839b4 100644 --- a/src/Proto.Actor/Context/RootContextDecorator.cs +++ b/src/Proto.Actor/Context/RootContextDecorator.cs @@ -3,6 +3,7 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; @@ -18,7 +19,7 @@ public abstract class RootContextDecorator : IRootContext protected RootContextDecorator(IRootContext context) => _context = context; - public virtual PID SpawnNamed(Props props, string name) => _context.SpawnNamed(props, name); + public virtual PID SpawnNamed(Props props, string name, Action? callback = null) => _context.SpawnNamed(props, name, callback); public virtual void Send(PID target, object message) => _context.Send(target, message); diff --git a/src/Proto.Actor/Delegates.cs b/src/Proto.Actor/Delegates.cs index 334b0bb09a..b678043a26 100644 --- a/src/Proto.Actor/Delegates.cs +++ b/src/Proto.Actor/Delegates.cs @@ -3,6 +3,7 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; using System.Threading.Tasks; using Proto.Mailbox; @@ -15,7 +16,7 @@ namespace Proto; public delegate Task Sender(ISenderContext context, PID target, MessageEnvelope envelope); -public delegate PID Spawner(ActorSystem system, string id, Props props, PID? parent); +public delegate PID Spawner(ActorSystem system, string id, Props props, PID? parent, Action? callback = null); public delegate IActor Producer(); diff --git a/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs b/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs index 09a16cfefc..72cc192fa4 100644 --- a/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs +++ b/src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs @@ -7,6 +7,7 @@ namespace Proto.Mailbox; + public class UnboundedMailboxQueue : IMailboxQueue { private readonly ConcurrentQueue _messages = new(); diff --git a/src/Proto.Actor/Props/Props.cs b/src/Proto.Actor/Props/Props.cs index 8ec054ec7b..9ab33ac91d 100644 --- a/src/Proto.Actor/Props/Props.cs +++ b/src/Proto.Actor/Props/Props.cs @@ -36,15 +36,13 @@ public sealed record Props public ImmutableList> ContextDecorator { get; init; } = ImmutableList>.Empty; - public ImmutableList> OnInit { get; init; } = ImmutableList>.Empty; - public Func? ContextDecoratorChain { get; init; } public Spawner Spawner { get; init; } = DefaultSpawner; private static IContext DefaultContextDecorator(IContext context) => context; - public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent) + public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent, Action? callback) { //Ordering is important here //first we create a mailbox and attach it to a process @@ -60,7 +58,7 @@ public static PID DefaultSpawner(ActorSystem system, string name, Props props, P //if successful, we create the actor and attach it to the mailbox var ctx = ActorContext.Setup(system, props, parent, self, mailbox); - Initialize(props, ctx); + callback?.Invoke(ctx); mailbox.RegisterHandlers(ctx, dispatcher); mailbox.PostSystemMessage(Started.Instance); @@ -70,14 +68,6 @@ public static PID DefaultSpawner(ActorSystem system, string name, Props props, P return self; } - private static void Initialize(Props props, ActorContext ctx) - { - foreach (var init in props.OnInit) - { - init(ctx); - } - } - public Props WithProducer(Producer producer) => this with {Producer = (_,_) => producer()}; @@ -109,11 +99,6 @@ public Props WithContextDecorator(params Func[] contextDecor }; } - public Props WithOnInit(params Action[] callback) => this with - { - OnInit = OnInit.AddRange(callback), - }; - public Props WithGuardianSupervisorStrategy(ISupervisorStrategy guardianStrategy) => this with {GuardianStrategy = guardianStrategy}; @@ -145,7 +130,7 @@ public Props WithSenderMiddleware(params Func[] middleware) public Props WithSpawner(Spawner spawner) => this with {Spawner = spawner}; - internal PID Spawn(ActorSystem system, string name, PID? parent) => Spawner(system, name, this, parent); + internal PID Spawn(ActorSystem system, string name, PID? parent, Action? callback=null) => Spawner(system, name, this, parent, callback); public static Props FromProducer(Producer producer) => Empty.WithProducer(_ => producer()); diff --git a/src/Proto.Actor/Router/Routers/RouterConfig.cs b/src/Proto.Actor/Router/Routers/RouterConfig.cs index ee4a484746..ced7cfa73b 100644 --- a/src/Proto.Actor/Router/Routers/RouterConfig.cs +++ b/src/Proto.Actor/Router/Routers/RouterConfig.cs @@ -3,6 +3,7 @@ // Copyright (C) 2015-2022 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; using System.Threading; using Proto.Context; @@ -16,7 +17,7 @@ public abstract record RouterConfig public Props Props() => new Props().WithSpawner(SpawnRouterProcess); - private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent) + private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent, Action? callback) { var routerState = CreateRouterState(); var wg = new AutoResetEvent(false); @@ -30,6 +31,7 @@ private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID if (!absent) throw new ProcessNameExistException(name, self); var ctx = ActorContext.Setup(system, p, parent, self, mailbox); + callback?.Invoke(ctx); mailbox.RegisterHandlers(ctx, dispatcher); mailbox.PostSystemMessage(Started.Instance); mailbox.Start(); diff --git a/src/Proto.Cluster/ClusterExtension.cs b/src/Proto.Cluster/ClusterExtension.cs index df9fe19519..7358f8db39 100644 --- a/src/Proto.Cluster/ClusterExtension.cs +++ b/src/Proto.Cluster/ClusterExtension.cs @@ -58,9 +58,6 @@ CancellationToken ct context.ReenterAfter(task, callback); } - public static Props WithClusterIdentity(this Props props, ClusterIdentity clusterIdentity) - => props.WithOnInit(context => context.Set(clusterIdentity)); - internal static Props WithClusterKind( this Props props, ActivatedClusterKind clusterKind diff --git a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs index 2fbc683583..173eef16d0 100644 --- a/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs +++ b/src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs @@ -110,10 +110,10 @@ private Task ActivationRequest(IContext context, ActivationRequest msg) //spawn and remember this actor //as this id is unique for this activation (id+counter) //we cannot get ProcessNameAlreadyExists exception here - var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity); + var sw = Stopwatch.StartNew(); - var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.ToString()); + var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity)); sw.Stop(); if (_cluster.System.Metrics.Enabled) diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index 5076bac62c..0f8a184dfc 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -37,10 +37,10 @@ public Task ReceiveAsync(IContext context) => context.Message switch { Started => OnStarted(context), - ActivationTerminating msg => ActivationTerminating(msg), - IdentityHandoverRequest msg => IdentityHandoverRequest(context, msg), + ActivationTerminating msg => OnActivationTerminating(msg), + IdentityHandoverRequest msg => OnIdentityHandoverRequest(context, msg), ClusterTopology msg => OnClusterTopology(context, msg), - ActivationRequest msg => ActivationRequest(context, msg), + ActivationRequest msg => OnActivationRequest(context, msg), _ => Task.CompletedTask }; @@ -205,7 +205,7 @@ private Task OnStarted(IContext context) return Task.CompletedTask; } - private Task ActivationTerminating(ActivationTerminating msg) + private Task OnActivationTerminating(ActivationTerminating msg) { if (!_myActors.TryGetValue(msg.ClusterIdentity, out var pid)) { @@ -239,7 +239,7 @@ private Task ActivationTerminating(ActivationTerminating msg) //this is pure, we do not change any state or actually move anything //the requester also provide its own view of the world in terms of members //TLDR; we are not using any topology state from this actor itself - private Task IdentityHandoverRequest(IContext context, IdentityHandoverRequest msg) + private Task OnIdentityHandoverRequest(IContext context, IdentityHandoverRequest msg) { if (context.Sender is null) { @@ -302,7 +302,7 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) => Props.FromFunc(r } ); - private Task ActivationRequest(IContext context, ActivationRequest msg) + private Task OnActivationRequest(IContext context, ActivationRequest msg) { try { @@ -328,10 +328,8 @@ private Task ActivationRequest(IContext context, ActivationRequest msg) //spawn and remember this actor //as this id is unique for this activation (id+counter) //we cannot get ProcessNameAlreadyExists exception here - - var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity); - - var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.Identity); + + var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity)); _myActors[msg.ClusterIdentity] = pid; diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 4b20cd20bd..a3574044a6 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -155,8 +155,7 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context) else { var clusterKind = _cluster.GetClusterKind(msg.Kind); - var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity); - var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.Identity); + var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity)); _actors.Add(msg.ClusterIdentity, pid); context.Respond(new ActivationResponse { diff --git a/src/Proto.TestKit/TestKitBase.cs b/src/Proto.TestKit/TestKitBase.cs index c49e2734d0..5fbc24435d 100644 --- a/src/Proto.TestKit/TestKitBase.cs +++ b/src/Proto.TestKit/TestKitBase.cs @@ -36,7 +36,7 @@ public TestProbe Probe { public PID Spawn(Props props) => Context.Spawn(props); /// - public PID SpawnNamed(Props props, string name) => Context.SpawnNamed(props, name); + public PID SpawnNamed(Props props, string name, Action? callback=null) => Context.SpawnNamed(props, name, callback); /// public PID SpawnPrefix(Props props, string prefix) => Context.SpawnPrefix(props, prefix); diff --git a/tests/Proto.Actor.Tests/PropsTests.cs b/tests/Proto.Actor.Tests/PropsTests.cs index f014b737fe..61d1042c05 100644 --- a/tests/Proto.Actor.Tests/PropsTests.cs +++ b/tests/Proto.Actor.Tests/PropsTests.cs @@ -109,7 +109,7 @@ public void Given_Props_When_WithProducer_Then_mutate_Producer() [Fact] public void Given_Props_When_WithSpawner_Then_mutate_Spawner() { - PID Spawner(ActorSystem s, string id, Props p, PID? parent) => new(); + PID Spawner(ActorSystem s, string id, Props p, PID? parent, Action callback) => new(); var props = new Props(); var props2 = props.WithSpawner(Spawner); diff --git a/tests/Proto.Actor.Tests/SpawnTests.cs b/tests/Proto.Actor.Tests/SpawnTests.cs index 609d95c336..87c05017a1 100644 --- a/tests/Proto.Actor.Tests/SpawnTests.cs +++ b/tests/Proto.Actor.Tests/SpawnTests.cs @@ -21,7 +21,7 @@ public async Task Given_PropsWithSpawner_SpawnShouldReturnPidCreatedBySpawner() var spawnedPid = PID.FromAddress("test", "test"); var props = Props.FromFunc(EmptyReceive) - .WithSpawner((s, id, p, parent) => spawnedPid); + .WithSpawner((s, id, p, parent, _) => spawnedPid); var pid = context.Spawn(props);