Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single node cluster provider / lookup #1750

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private HashSet<string> GetIncompletePartitionAddresses(HandoverSink sink, strin
private Task OnStarted(IContext context)
{
var self = context.Self;
_cluster.System.EventStream.Subscribe<ActivationTerminated>(e => _cluster.System.Root.Send(self, e));
_cluster.System.EventStream.Subscribe<ActivationTerminated>(context.System.Root, context.Self);

return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public PartitionActivatorActor(Cluster cluster, PartitionActivatorManager manage
private Task OnStarted(IContext context)
{
var self = context.Self;
_cluster.System.EventStream.Subscribe<ActivationTerminated>(e => _cluster.System.Root.Send(self, e));
_cluster.System.EventStream.Subscribe<ActivationTerminating>(e => _cluster.System.Root.Send(self, e));
_cluster.System.EventStream.Subscribe<ActivationTerminated>(context.System.Root, context.Self);
_cluster.System.EventStream.Subscribe<ActivationTerminating>(context.System.Root, context.Self);

return Task.CompletedTask;
}
Expand Down
214 changes: 214 additions & 0 deletions src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// -----------------------------------------------------------------------
// <copyright file="SingleNodeActivatorActor.cs" company="Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Proto.Cluster.SingleNode;

class SingleNodeActivatorActor : IActor
{
private static readonly ILogger Logger = Log.CreateLogger<SingleNodeActivatorActor>();


private readonly Cluster _cluster;
private readonly Dictionary<ClusterIdentity, PID> _actors = new();
private readonly HashSet<ClusterIdentity> _inFlightIdentityChecks = new();

public SingleNodeActivatorActor(Cluster cluster) => _cluster = cluster;

private Task OnStarted(IContext context)
{
var self = context.Self;
_cluster.System.EventStream.Subscribe<ActivationTerminated>(context.System.Root, self);
_cluster.System.EventStream.Subscribe<ActivationTerminating>(context.System.Root, self);

return Task.CompletedTask;
}

public Task ReceiveAsync(IContext context) =>
context.Message switch
{
Started => OnStarted(context),
Stopping => OnStopping(context),
ActivationRequest msg => OnActivationRequest(msg, context),
ActivationTerminated msg => OnActivationTerminated(msg),
ActivationTerminating msg => OnActivationTerminating(msg),
_ => Task.CompletedTask
};

private async Task OnStopping(IContext context)
{
await StopActors(context);
_cluster.PidCache.RemoveByPredicate(kv => kv.Value.Address.Equals(context.System.Address, StringComparison.Ordinal));
}

private async Task StopActors(IContext context)
{
var stopping = new List<Task>();

var clusterIdentities = _actors.Keys.ToList();

foreach (var ci in clusterIdentities)
{
var pid = _actors[ci];
var stoppingTask = context.PoisonAsync(pid);
stopping.Add(stoppingTask);
_actors.Remove(ci);
}

//await graceful shutdown of all actors
await Task.WhenAll(stopping);
Logger.LogInformation("[SingleNode] - Stopped {ActorCount} actors", clusterIdentities.Count);
}

private Task OnActivationTerminated(ActivationTerminated msg)
{
_cluster.PidCache.RemoveByVal(msg.ClusterIdentity, msg.Pid);

if (Logger.IsEnabled(LogLevel.Trace))
Logger.LogTrace("[SingleNode] Terminated {Pid}", msg.Pid);

return Task.CompletedTask;
}

private Task OnActivationTerminating(ActivationTerminating msg)
{
// ActivationTerminating is sent to the local EventStream when a
// local cluster actor stops.

if (!_actors.ContainsKey(msg.ClusterIdentity))
return Task.CompletedTask;

if (Logger.IsEnabled(LogLevel.Trace))
Logger.LogTrace("[SingleNode] Terminating {Pid}", msg.Pid);

_actors.Remove(msg.ClusterIdentity);

// Broadcast ActivationTerminated to all nodes so that PidCaches gets
mhelleborg marked this conversation as resolved.
Show resolved Hide resolved
// cleared correctly.
var activationTerminated = new ActivationTerminated
{
Pid = msg.Pid,
ClusterIdentity = msg.ClusterIdentity,
};
_cluster.MemberList.BroadcastEvent(activationTerminated);

return Task.CompletedTask;
}

private Task OnActivationRequest(ActivationRequest msg, IContext context)
{
if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
context.Respond(new ActivationResponse
{
Pid = existing,
}
);
}
else
{
var clusterKind = _cluster.GetClusterKind(msg.Kind);

if (clusterKind.CanSpawnIdentity is not null)
{
// Needs to check if the identity is allowed to spawn
VerifyAndSpawn(msg, context, clusterKind);
}
else
{
Spawn(msg, context, clusterKind);
}
}

return Task.CompletedTask;
}

private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
{
var clusterIdentity = msg.ClusterIdentity;

if (_inFlightIdentityChecks.Contains(clusterIdentity))
{
Logger.LogError("[SingleNode] Duplicate activation requests for {ClusterIdentity}", clusterIdentity);
context.Respond(new ActivationResponse
{
Failed = true,
}
);
}

var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout));

if (canSpawn.IsCompleted)
{
OnSpawnDecided(msg, context, clusterKind, canSpawn.Result);
return;
}

_inFlightIdentityChecks.Add(clusterIdentity);
context.ReenterAfter(canSpawn.AsTask(), task => {
_inFlightIdentityChecks.Remove(clusterIdentity);

if (task.IsCompletedSuccessfully)
{
OnSpawnDecided(msg, context, clusterKind, task.Result);
}
else
{
Logger.LogError("[SingleNode] Error when checking {ClusterIdentity}", clusterIdentity);
context.Respond(new ActivationResponse
{
Failed = true,
}
);
}

return Task.CompletedTask;
}
);
}

private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
{
try
{
var pid = context.Spawn(clusterKind.Props, ctx => ctx.Set(msg.ClusterIdentity));
_actors.Add(msg.ClusterIdentity, pid);
context.Respond(new ActivationResponse
{
Pid = pid,
}
);
}
catch (Exception e)
{
e.CheckFailFast();
Logger.LogError(e, "[SingleNode] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity);
context.Respond(new ActivationResponse {Failed = true});
}
}

private void OnSpawnDecided(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind, bool canSpawnIdentity)
{
if (canSpawnIdentity)
{
Spawn(msg, context, clusterKind);
}
else
{
context.Respond(new ActivationResponse
{
Failed = true,
InvalidIdentity = true
}
);
}
}
}
99 changes: 99 additions & 0 deletions src/Proto.Cluster/SingleNode/SingleNodeLookup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// -----------------------------------------------------------------------
// <copyright file="SingleNodeLookup.cs" company="Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto.Cluster.Identity;

namespace Proto.Cluster.SingleNode;

/// <summary>
/// Provides a lookup optimized for single node 'clusters'
/// Only usable with SingleNodeProvider
/// </summary>
public class SingleNodeLookup : IIdentityLookup
{
private const string ActivatorActorName = "$sn-activator";
private PID _activatorActor = null!;

private static readonly ILogger Logger = Log.CreateLogger<SingleNodeLookup>();
private readonly TimeSpan _getPidTimeout;
private Cluster _cluster = null!;

public SingleNodeLookup() : this(TimeSpan.FromSeconds(1))
{
}

public SingleNodeLookup(TimeSpan getPidTimeout) => _getPidTimeout = getPidTimeout;

public async Task<PID?> GetAsync(ClusterIdentity clusterIdentity, CancellationToken notUsed)
{
using var cts = new CancellationTokenSource(_getPidTimeout);

var req = new ActivationRequest
{
RequestId = Guid.NewGuid().ToString("N"),
ClusterIdentity = clusterIdentity
};

try
{
var resp = await _cluster.System.Root.RequestAsync<ActivationResponse>(_activatorActor, req, cts.Token);

if (resp.InvalidIdentity)
{
throw new IdentityIsBlocked(clusterIdentity);
}

return resp?.Pid;
}
catch (DeadLetterException)
{
Logger.LogInformation("[SingleNode] Remote PID request deadletter {@Request}", req);
return null;
}
catch (TimeoutException)
{
Logger.LogInformation("[SingleNode] Remote PID request timeout {@Request}", req);
return null;
}
catch (Exception e) when (e is not IdentityIsBlocked)
{
e.CheckFailFast();
Logger.LogError(e, "[SingleNode] Error occured requesting remote PID {@Request}", req);
return null;
}
}

public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, CancellationToken ct)
{
var activationTerminated = new ActivationTerminated
{
Pid = pid,
ClusterIdentity = clusterIdentity
};

_cluster.MemberList.BroadcastEvent(activationTerminated);

return Task.CompletedTask;
}

public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
{
if (cluster.Provider is not SingleNodeProvider || isClient)
{
throw new ArgumentException("SingleNodeLookup can only be used with SingleNodeProvider in server mode");
}

_cluster = cluster;
var props = Props.FromProducer(() => new SingleNodeActivatorActor(_cluster));
_activatorActor = cluster.System.Root.SpawnNamedSystem(props, ActivatorActorName);
return Task.CompletedTask;
}

public Task ShutdownAsync() => _cluster.System.Root.StopAsync(_activatorActor);
}
42 changes: 42 additions & 0 deletions src/Proto.Cluster/SingleNode/SingleNodeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// -----------------------------------------------------------------------
// <copyright file = "SingleNodeProvider.cs" company = "Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading.Tasks;

namespace Proto.Cluster.SingleNode;

/// <summary>
/// Provides an in-memory cluster provider for a single node
/// Makes the cluster abstractions available for
/// single node scenarios, with zero need for external coordination
/// </summary>
public class SingleNodeProvider : IClusterProvider
{
private Cluster? _cluster;

public Task StartMemberAsync(Cluster cluster)
{
_cluster = cluster;
var (host, port) = cluster.System.GetAddress();
var member = new Member
{
Host = host,
Port = port,
Id = cluster.System.Id,
Kinds = {cluster.GetClusterKinds()}
};
cluster.MemberList.UpdateClusterTopology(new[] {member});
return Task.CompletedTask;
}

public Task StartClientAsync(Cluster cluster) => throw new NotSupportedException("Single node provider does not support client mode");

public Task ShutdownAsync(bool graceful)
{
_cluster?.MemberList.UpdateClusterTopology(Array.Empty<Member>());
return Task.CompletedTask;
}
}
Loading