Skip to content

Commit

Permalink
PoC spawner callbacks (#1616)
Browse files Browse the repository at this point in the history
* Add spawn callback
* Remove OnInit
  • Loading branch information
rogeralsing committed May 26, 2022
1 parent 591b07e commit 40c3dd2
Show file tree
Hide file tree
Showing 18 changed files with 56 additions and 56 deletions.
10 changes: 4 additions & 6 deletions benchmarks/PropsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
Console.WriteLine("Starting");
for (var i = 0; i < 1_000_000; i++)
{
var p = props.WithOnInit(ctx => {
ctx.Set(new SomeState("SomeId" + i));
}
);

var pid = system.Root.Spawn(p);
var i1 = i;
var pid = system.Root.SpawnNamed(props,"x" + i, ctx => {
ctx.Set(new SomeState("SomeId" + i1));
});
}

Console.WriteLine("Done");
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ public void Respond(object message)
Logger.LogWarning("{Self} Tried to respond but sender is null, with message {Message}", Self, message);
}

public PID SpawnNamed(Props props, string name)
public PID SpawnNamed(Props props, string name, Action<IContext>? callback = null)
{
if (props.GuardianStrategy is not null)
throw new ArgumentException("Props used to spawn child cannot have GuardianStrategy.");

try
{
var pid = props.Spawn(System, $"{Self.Id}/{name}", Self);
var pid = props.Spawn(System, $"{Self.Id}/{name}", Self, callback);
EnsureExtras().AddChild(pid);

return pid;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Context/ActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public virtual void Respond(object message) =>
public virtual void Stash() =>
_context.Stash();

public virtual PID SpawnNamed(Props props, string name) =>
_context.SpawnNamed(props, name);
public virtual PID SpawnNamed(Props props, string name, Action<IContext>? callback=null) =>
_context.SpawnNamed(props, name, callback);

public virtual void Watch(PID pid) =>
_context.Watch(pid);
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Context/ActorLoggingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ private LogLevel GetLogLevel(object message)
return logLevel;
}

public override PID SpawnNamed(Props props, string name)
public override PID SpawnNamed(Props props, string name, Action<IContext>? callback = null)
{
try
{
var pid = base.SpawnNamed(props, name);
var pid = base.SpawnNamed(props, name, callback);

if (_logLevel != LogLevel.None && _logger.IsEnabled(_logLevel))
{
Expand Down
19 changes: 18 additions & 1 deletion src/Proto.Actor/Context/ISpawnerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
// -----------------------------------------------------------------------

// ReSharper disable once CheckNamespace
using System;

namespace Proto;

public interface ISpawnerContext : ISystemContext
Expand All @@ -14,8 +16,9 @@ public interface ISpawnerContext : ISystemContext
/// </summary>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="name">The actor name</param>
/// <param name="callback"></param>
/// <returns>The PID of the child actor</returns>
PID SpawnNamed(Props props, string name);
PID SpawnNamed(Props props, string name, Action<IContext>? callback=null);
}

public static class SpawnerContextExtensions
Expand All @@ -34,11 +37,25 @@ public static PID Spawn(this ISpawnerContext self, Props props)
/// <summary>
/// Spawns a new child actor based on props and named using a prefix followed by a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="prefix">The prefix for the actor name</param>
public static PID SpawnPrefix(this ISpawnerContext self,Props props, string prefix)
{
var name = prefix + self.System.ProcessRegistry.NextId();
return self.SpawnNamed(props, name);
}

/// <summary>
/// Spawns a new child actor based on props and named using a prefix followed by a unique ID.
/// </summary>
/// <param name="self"></param>
/// <param name="props">The Props used to spawn the actor</param>
/// <param name="prefix">The prefix for the actor name</param>
/// <param name="callback"></param>
public static PID SpawnPrefix(this ISpawnerContext self,Props props, string prefix, Action<IContext> callback)
{
var name = prefix + self.System.ProcessRegistry.NextId();
return self.SpawnNamed(props, name, callback);
}
}
5 changes: 3 additions & 2 deletions src/Proto.Actor/Context/RootContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public RootContext(ActorSystem system, MessageHeader? messageHeader, params Func

private Sender? SenderMiddleware { get; init; }
public ActorSystem System { get; }

private TypeDictionary<object, RootContext> Store { get; } = new(0, 1);

public T? Get<T>() => (T?) Store.Get<T>();
Expand All @@ -55,14 +56,14 @@ public RootContext(ActorSystem system, MessageHeader? messageHeader, params Func
PID? IInfoContext.Sender => null;
public IActor? Actor => null;

public PID SpawnNamed(Props props, string name)
public PID SpawnNamed(Props props, string name, Action<IContext>? callback=null)
{
try
{
var parent = props.GuardianStrategy is not null
? System.Guardians.GetGuardianPid(props.GuardianStrategy)
: null;
return props.Spawn(System, name, parent);
return props.Spawn(System, name, parent, callback);
}
catch (Exception x)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Context/RootContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand All @@ -18,7 +19,7 @@ public abstract class RootContextDecorator : IRootContext

protected RootContextDecorator(IRootContext context) => _context = context;

public virtual PID SpawnNamed(Props props, string name) => _context.SpawnNamed(props, name);
public virtual PID SpawnNamed(Props props, string name, Action<IContext>? callback = null) => _context.SpawnNamed(props, name, callback);

public virtual void Send(PID target, object message) => _context.Send(target, message);

Expand Down
3 changes: 2 additions & 1 deletion src/Proto.Actor/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using Proto.Mailbox;

Expand All @@ -15,7 +16,7 @@ namespace Proto;

public delegate Task Sender(ISenderContext context, PID target, MessageEnvelope envelope);

public delegate PID Spawner(ActorSystem system, string id, Props props, PID? parent);
public delegate PID Spawner(ActorSystem system, string id, Props props, PID? parent, Action<IContext>? callback = null);

public delegate IActor Producer();

Expand Down
1 change: 1 addition & 0 deletions src/Proto.Actor/Mailbox/UnboundedMailboxQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Proto.Mailbox;


public class UnboundedMailboxQueue : IMailboxQueue
{
private readonly ConcurrentQueue<object> _messages = new();
Expand Down
21 changes: 3 additions & 18 deletions src/Proto.Actor/Props/Props.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ public sealed record Props
public ImmutableList<Func<IContext, IContext>> ContextDecorator { get; init; } =
ImmutableList<Func<IContext, IContext>>.Empty;

public ImmutableList<Action<IContext>> OnInit { get; init; } = ImmutableList<Action<IContext>>.Empty;

public Func<IContext, IContext>? ContextDecoratorChain { get; init; }

public Spawner Spawner { get; init; } = DefaultSpawner;

private static IContext DefaultContextDecorator(IContext context) => context;

public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent)
public static PID DefaultSpawner(ActorSystem system, string name, Props props, PID? parent, Action<IContext>? callback)
{
//Ordering is important here
//first we create a mailbox and attach it to a process
Expand All @@ -60,7 +58,7 @@ public static PID DefaultSpawner(ActorSystem system, string name, Props props, P

//if successful, we create the actor and attach it to the mailbox
var ctx = ActorContext.Setup(system, props, parent, self, mailbox);
Initialize(props, ctx);
callback?.Invoke(ctx);
mailbox.RegisterHandlers(ctx, dispatcher);
mailbox.PostSystemMessage(Started.Instance);

Expand All @@ -70,14 +68,6 @@ public static PID DefaultSpawner(ActorSystem system, string name, Props props, P
return self;
}

private static void Initialize(Props props, ActorContext ctx)
{
foreach (var init in props.OnInit)
{
init(ctx);
}
}

public Props WithProducer(Producer producer) =>
this with {Producer = (_,_) => producer()};

Expand Down Expand Up @@ -109,11 +99,6 @@ public Props WithContextDecorator(params Func<IContext, IContext>[] contextDecor
};
}

public Props WithOnInit(params Action<IContext>[] callback) => this with
{
OnInit = OnInit.AddRange(callback),
};

public Props WithGuardianSupervisorStrategy(ISupervisorStrategy guardianStrategy) =>
this with {GuardianStrategy = guardianStrategy};

Expand Down Expand Up @@ -145,7 +130,7 @@ public Props WithSenderMiddleware(params Func<Sender, Sender>[] middleware)
public Props WithSpawner(Spawner spawner) =>
this with {Spawner = spawner};

internal PID Spawn(ActorSystem system, string name, PID? parent) => Spawner(system, name, this, parent);
internal PID Spawn(ActorSystem system, string name, PID? parent, Action<IContext>? callback=null) => Spawner(system, name, this, parent, callback);

public static Props FromProducer(Producer producer) => Empty.WithProducer(_ => producer());

Expand Down
4 changes: 3 additions & 1 deletion src/Proto.Actor/Router/Routers/RouterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading;
using Proto.Context;

Expand All @@ -16,7 +17,7 @@ public abstract record RouterConfig

public Props Props() => new Props().WithSpawner(SpawnRouterProcess);

private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent)
private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID? parent, Action<IContext>? callback)
{
var routerState = CreateRouterState();
var wg = new AutoResetEvent(false);
Expand All @@ -30,6 +31,7 @@ private PID SpawnRouterProcess(ActorSystem system, string name, Props props, PID
if (!absent) throw new ProcessNameExistException(name, self);

var ctx = ActorContext.Setup(system, p, parent, self, mailbox);
callback?.Invoke(ctx);
mailbox.RegisterHandlers(ctx, dispatcher);
mailbox.PostSystemMessage(Started.Instance);
mailbox.Start();
Expand Down
3 changes: 0 additions & 3 deletions src/Proto.Cluster/ClusterExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ CancellationToken ct
context.ReenterAfter(task, callback);
}

public static Props WithClusterIdentity(this Props props, ClusterIdentity clusterIdentity)
=> props.WithOnInit(context => context.Set(clusterIdentity));

internal static Props WithClusterKind(
this Props props,
ActivatedClusterKind clusterKind
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ private Task ActivationRequest(IContext context, ActivationRequest msg)
//spawn and remember this actor
//as this id is unique for this activation (id+counter)
//we cannot get ProcessNameAlreadyExists exception here
var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity);


var sw = Stopwatch.StartNew();
var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.ToString());
var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity));
sw.Stop();

if (_cluster.System.Metrics.Enabled)
Expand Down
18 changes: 8 additions & 10 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
ActivationTerminating msg => ActivationTerminating(msg),
IdentityHandoverRequest msg => IdentityHandoverRequest(context, msg),
ActivationTerminating msg => OnActivationTerminating(msg),
IdentityHandoverRequest msg => OnIdentityHandoverRequest(context, msg),
ClusterTopology msg => OnClusterTopology(context, msg),
ActivationRequest msg => ActivationRequest(context, msg),
ActivationRequest msg => OnActivationRequest(context, msg),
_ => Task.CompletedTask
};

Expand Down Expand Up @@ -205,7 +205,7 @@ private Task OnStarted(IContext context)
return Task.CompletedTask;
}

private Task ActivationTerminating(ActivationTerminating msg)
private Task OnActivationTerminating(ActivationTerminating msg)
{
if (!_myActors.TryGetValue(msg.ClusterIdentity, out var pid))
{
Expand Down Expand Up @@ -239,7 +239,7 @@ private Task ActivationTerminating(ActivationTerminating msg)
//this is pure, we do not change any state or actually move anything
//the requester also provide its own view of the world in terms of members
//TLDR; we are not using any topology state from this actor itself
private Task IdentityHandoverRequest(IContext context, IdentityHandoverRequest msg)
private Task OnIdentityHandoverRequest(IContext context, IdentityHandoverRequest msg)
{
if (context.Sender is null)
{
Expand Down Expand Up @@ -302,7 +302,7 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) => Props.FromFunc(r
}
);

private Task ActivationRequest(IContext context, ActivationRequest msg)
private Task OnActivationRequest(IContext context, ActivationRequest msg)
{
try
{
Expand All @@ -328,10 +328,8 @@ private Task ActivationRequest(IContext context, ActivationRequest msg)
//spawn and remember this actor
//as this id is unique for this activation (id+counter)
//we cannot get ProcessNameAlreadyExists exception here

var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity);

var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.Identity);

var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity));

_myActors[msg.ClusterIdentity] = pid;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
else
{
var clusterKind = _cluster.GetClusterKind(msg.Kind);
var clusterProps = clusterKind.Props.WithClusterIdentity(msg.ClusterIdentity);
var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.Identity);
var pid = context.SpawnPrefix(clusterKind.Props, msg.ClusterIdentity.Identity, ctx => ctx.Set(msg.ClusterIdentity));
_actors.Add(msg.ClusterIdentity, pid);
context.Respond(new ActivationResponse
{
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.TestKit/TestKitBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public TestProbe Probe {
public PID Spawn(Props props) => Context.Spawn(props);

/// <inheritdoc />
public PID SpawnNamed(Props props, string name) => Context.SpawnNamed(props, name);
public PID SpawnNamed(Props props, string name, Action<IContext>? callback=null) => Context.SpawnNamed(props, name, callback);

/// <inheritdoc />
public PID SpawnPrefix(Props props, string prefix) => Context.SpawnPrefix(props, prefix);
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Actor.Tests/PropsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void Given_Props_When_WithProducer_Then_mutate_Producer()
[Fact]
public void Given_Props_When_WithSpawner_Then_mutate_Spawner()
{
PID Spawner(ActorSystem s, string id, Props p, PID? parent) => new();
PID Spawner(ActorSystem s, string id, Props p, PID? parent, Action<IContext> callback) => new();

var props = new Props();
var props2 = props.WithSpawner(Spawner);
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Actor.Tests/SpawnTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Given_PropsWithSpawner_SpawnShouldReturnPidCreatedBySpawner()

var spawnedPid = PID.FromAddress("test", "test");
var props = Props.FromFunc(EmptyReceive)
.WithSpawner((s, id, p, parent) => spawnedPid);
.WithSpawner((s, id, p, parent, _) => spawnedPid);

var pid = context.Spawn(props);

Expand Down

0 comments on commit 40c3dd2

Please sign in to comment.