diff --git a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs index e41d8e92a4..c7a8247166 100644 --- a/src/Proto.Cluster/Partition/PartitionIdentityActor.cs +++ b/src/Proto.Cluster/Partition/PartitionIdentityActor.cs @@ -199,7 +199,7 @@ private HashSet GetIncompletePartitionAddresses(HandoverSink sink, strin private Task OnStarted(IContext context) { var self = context.Self; - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); return Task.CompletedTask; } diff --git a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs index 3dc9c5a600..5688133421 100644 --- a/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs +++ b/src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs @@ -41,8 +41,8 @@ public PartitionActivatorActor(Cluster cluster, PartitionActivatorManager manage private Task OnStarted(IContext context) { var self = context.Self; - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); - _cluster.System.EventStream.Subscribe(e => _cluster.System.Root.Send(self, e)); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); + _cluster.System.EventStream.Subscribe(context.System.Root, context.Self); return Task.CompletedTask; } diff --git a/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs new file mode 100644 index 0000000000..628c94bdeb --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs @@ -0,0 +1,214 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Proto.Cluster.SingleNode; + +class SingleNodeActivatorActor : IActor +{ + private static readonly ILogger Logger = Log.CreateLogger(); + + + private readonly Cluster _cluster; + private readonly Dictionary _actors = new(); + private readonly HashSet _inFlightIdentityChecks = new(); + + public SingleNodeActivatorActor(Cluster cluster) => _cluster = cluster; + + private Task OnStarted(IContext context) + { + var self = context.Self; + _cluster.System.EventStream.Subscribe(context.System.Root, self); + _cluster.System.EventStream.Subscribe(context.System.Root, self); + + return Task.CompletedTask; + } + + public Task ReceiveAsync(IContext context) => + context.Message switch + { + Started => OnStarted(context), + Stopping => OnStopping(context), + ActivationRequest msg => OnActivationRequest(msg, context), + ActivationTerminated msg => OnActivationTerminated(msg), + ActivationTerminating msg => OnActivationTerminating(msg), + _ => Task.CompletedTask + }; + + private async Task OnStopping(IContext context) + { + await StopActors(context); + _cluster.PidCache.RemoveByPredicate(kv => kv.Value.Address.Equals(context.System.Address, StringComparison.Ordinal)); + } + + private async Task StopActors(IContext context) + { + var stopping = new List(); + + var clusterIdentities = _actors.Keys.ToList(); + + foreach (var ci in clusterIdentities) + { + var pid = _actors[ci]; + var stoppingTask = context.PoisonAsync(pid); + stopping.Add(stoppingTask); + _actors.Remove(ci); + } + + //await graceful shutdown of all actors + await Task.WhenAll(stopping); + Logger.LogInformation("[SingleNode] - Stopped {ActorCount} actors", clusterIdentities.Count); + } + + private Task OnActivationTerminated(ActivationTerminated msg) + { + _cluster.PidCache.RemoveByVal(msg.ClusterIdentity, msg.Pid); + + if (Logger.IsEnabled(LogLevel.Trace)) + Logger.LogTrace("[SingleNode] Terminated {Pid}", msg.Pid); + + return Task.CompletedTask; + } + + private Task OnActivationTerminating(ActivationTerminating msg) + { + // ActivationTerminating is sent to the local EventStream when a + // local cluster actor stops. + + if (!_actors.ContainsKey(msg.ClusterIdentity)) + return Task.CompletedTask; + + if (Logger.IsEnabled(LogLevel.Trace)) + Logger.LogTrace("[SingleNode] Terminating {Pid}", msg.Pid); + + _actors.Remove(msg.ClusterIdentity); + + // Broadcast ActivationTerminated to all nodes so that PidCaches gets + // cleared correctly. + var activationTerminated = new ActivationTerminated + { + Pid = msg.Pid, + ClusterIdentity = msg.ClusterIdentity, + }; + _cluster.MemberList.BroadcastEvent(activationTerminated); + + return Task.CompletedTask; + } + + private Task OnActivationRequest(ActivationRequest msg, IContext context) + { + if (_actors.TryGetValue(msg.ClusterIdentity, out var existing)) + { + context.Respond(new ActivationResponse + { + Pid = existing, + } + ); + } + else + { + var clusterKind = _cluster.GetClusterKind(msg.Kind); + + if (clusterKind.CanSpawnIdentity is not null) + { + // Needs to check if the identity is allowed to spawn + VerifyAndSpawn(msg, context, clusterKind); + } + else + { + Spawn(msg, context, clusterKind); + } + } + + return Task.CompletedTask; + } + + private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind) + { + var clusterIdentity = msg.ClusterIdentity; + + if (_inFlightIdentityChecks.Contains(clusterIdentity)) + { + Logger.LogError("[SingleNode] Duplicate activation requests for {ClusterIdentity}", clusterIdentity); + context.Respond(new ActivationResponse + { + Failed = true, + } + ); + } + + var canSpawn = clusterKind.CanSpawnIdentity!(msg.Identity, CancellationTokens.FromSeconds(_cluster.Config.ActorSpawnTimeout)); + + if (canSpawn.IsCompleted) + { + OnSpawnDecided(msg, context, clusterKind, canSpawn.Result); + return; + } + + _inFlightIdentityChecks.Add(clusterIdentity); + context.ReenterAfter(canSpawn.AsTask(), task => { + _inFlightIdentityChecks.Remove(clusterIdentity); + + if (task.IsCompletedSuccessfully) + { + OnSpawnDecided(msg, context, clusterKind, task.Result); + } + else + { + Logger.LogError("[SingleNode] Error when checking {ClusterIdentity}", clusterIdentity); + context.Respond(new ActivationResponse + { + Failed = true, + } + ); + } + + return Task.CompletedTask; + } + ); + } + + private void Spawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind) + { + try + { + var pid = context.Spawn(clusterKind.Props, ctx => ctx.Set(msg.ClusterIdentity)); + _actors.Add(msg.ClusterIdentity, pid); + context.Respond(new ActivationResponse + { + Pid = pid, + } + ); + } + catch (Exception e) + { + e.CheckFailFast(); + Logger.LogError(e, "[SingleNode] Failed to spawn {Kind}/{Identity}", msg.Kind, msg.Identity); + context.Respond(new ActivationResponse {Failed = true}); + } + } + + private void OnSpawnDecided(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind, bool canSpawnIdentity) + { + if (canSpawnIdentity) + { + Spawn(msg, context, clusterKind); + } + else + { + context.Respond(new ActivationResponse + { + Failed = true, + InvalidIdentity = true + } + ); + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs new file mode 100644 index 0000000000..6f00664dc7 --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeLookup.cs @@ -0,0 +1,99 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Proto.Cluster.Identity; + +namespace Proto.Cluster.SingleNode; + +/// +/// Provides a lookup optimized for single node 'clusters' +/// Only usable with SingleNodeProvider +/// +public class SingleNodeLookup : IIdentityLookup +{ + private const string ActivatorActorName = "$sn-activator"; + private PID _activatorActor = null!; + + private static readonly ILogger Logger = Log.CreateLogger(); + private readonly TimeSpan _getPidTimeout; + private Cluster _cluster = null!; + + public SingleNodeLookup() : this(TimeSpan.FromSeconds(1)) + { + } + + public SingleNodeLookup(TimeSpan getPidTimeout) => _getPidTimeout = getPidTimeout; + + public async Task GetAsync(ClusterIdentity clusterIdentity, CancellationToken notUsed) + { + using var cts = new CancellationTokenSource(_getPidTimeout); + + var req = new ActivationRequest + { + RequestId = Guid.NewGuid().ToString("N"), + ClusterIdentity = clusterIdentity + }; + + try + { + var resp = await _cluster.System.Root.RequestAsync(_activatorActor, req, cts.Token); + + if (resp.InvalidIdentity) + { + throw new IdentityIsBlocked(clusterIdentity); + } + + return resp?.Pid; + } + catch (DeadLetterException) + { + Logger.LogInformation("[SingleNode] Remote PID request deadletter {@Request}", req); + return null; + } + catch (TimeoutException) + { + Logger.LogInformation("[SingleNode] Remote PID request timeout {@Request}", req); + return null; + } + catch (Exception e) when (e is not IdentityIsBlocked) + { + e.CheckFailFast(); + Logger.LogError(e, "[SingleNode] Error occured requesting remote PID {@Request}", req); + return null; + } + } + + public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, CancellationToken ct) + { + var activationTerminated = new ActivationTerminated + { + Pid = pid, + ClusterIdentity = clusterIdentity + }; + + _cluster.MemberList.BroadcastEvent(activationTerminated); + + return Task.CompletedTask; + } + + public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient) + { + if (cluster.Provider is not SingleNodeProvider || isClient) + { + throw new ArgumentException("SingleNodeLookup can only be used with SingleNodeProvider in server mode"); + } + + _cluster = cluster; + var props = Props.FromProducer(() => new SingleNodeActivatorActor(_cluster)); + _activatorActor = cluster.System.Root.SpawnNamedSystem(props, ActivatorActorName); + return Task.CompletedTask; + } + + public Task ShutdownAsync() => _cluster.System.Root.StopAsync(_activatorActor); +} \ No newline at end of file diff --git a/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs b/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs new file mode 100644 index 0000000000..74495990bf --- /dev/null +++ b/src/Proto.Cluster/SingleNode/SingleNodeProvider.cs @@ -0,0 +1,42 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2022 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading.Tasks; + +namespace Proto.Cluster.SingleNode; + +/// +/// Provides an in-memory cluster provider for a single node +/// Makes the cluster abstractions available for +/// single node scenarios, with zero need for external coordination +/// +public class SingleNodeProvider : IClusterProvider +{ + private Cluster? _cluster; + + public Task StartMemberAsync(Cluster cluster) + { + _cluster = cluster; + var (host, port) = cluster.System.GetAddress(); + var member = new Member + { + Host = host, + Port = port, + Id = cluster.System.Id, + Kinds = {cluster.GetClusterKinds()} + }; + cluster.MemberList.UpdateClusterTopology(new[] {member}); + return Task.CompletedTask; + } + + public Task StartClientAsync(Cluster cluster) => throw new NotSupportedException("Single node provider does not support client mode"); + + public Task ShutdownAsync(bool graceful) + { + _cluster?.MemberList.UpdateClusterTopology(Array.Empty()); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/tests/Proto.Cluster.Tests/ClusterFixture.cs b/tests/Proto.Cluster.Tests/ClusterFixture.cs index 80cefd3b3e..3da8befb88 100644 --- a/tests/Proto.Cluster.Tests/ClusterFixture.cs +++ b/tests/Proto.Cluster.Tests/ClusterFixture.cs @@ -11,6 +11,7 @@ using Proto.Cluster.Identity; using Proto.Cluster.Partition; using Proto.Cluster.PartitionActivator; +using Proto.Cluster.SingleNode; using Proto.Cluster.Testing; using Proto.Logging; using Proto.OpenTelemetry; @@ -29,6 +30,7 @@ public interface IClusterFixture public Task SpawnNode(); LogStore LogStore { get; } + int ClusterSize { get; } Task RemoveNode(Cluster member, bool graceful = true); } @@ -39,7 +41,7 @@ public abstract class ClusterFixture : IAsyncLifetime, IClusterFixture, IAsyncDi public const string InvalidIdentity = "invalid"; protected readonly string ClusterName; - private readonly int _clusterSize; + public int ClusterSize { get; } private readonly Func? _configure; private readonly ILogger _logger = Log.CreateLogger(nameof(GetType)); private readonly TracerProvider? _tracerProvider; @@ -50,7 +52,7 @@ protected ClusterFixture(int clusterSize, Func? co #if NETCOREAPP3_1 AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); #endif - _clusterSize = clusterSize; + ClusterSize = clusterSize; _configure = configure; ClusterName = $"test-cluster-{Guid.NewGuid().ToString().Substring(0, 6)}"; @@ -89,7 +91,7 @@ private static TracerProvider InitOpenTelemetryTracing() => global::OpenTelemetr public async Task InitializeAsync() { - var nodes = await SpawnClusterNodes(_clusterSize, _configure).ConfigureAwait(false); + var nodes = await SpawnClusterNodes(ClusterSize, _configure).ConfigureAwait(false); _members.AddRange(nodes); } @@ -178,9 +180,11 @@ protected virtual ActorSystemConfig GetActorSystemConfig() var actorSystemConfig = ActorSystemConfig.Setup(); // ReSharper disable once HeuristicUnreachableCode - return EnableTracing ? actorSystemConfig - .WithConfigureProps(props => props.WithTracing()) - .WithConfigureRootContext(context => context.WithTracing()): actorSystemConfig; + return EnableTracing + ? actorSystemConfig + .WithConfigureProps(props => props.WithTracing()) + .WithConfigureRootContext(context => context.WithTracing()) + : actorSystemConfig; } protected abstract IClusterProvider GetClusterProvider(); @@ -216,7 +220,6 @@ protected BaseInMemoryClusterFixture(int clusterSize, Func new TestProvider(new TestProviderOptions(), InMemAgent); } -// ReSharper disable once ClassNeverInstantiated.Global public class InMemoryClusterFixture : BaseInMemoryClusterFixture { public InMemoryClusterFixture() : base(3, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) @@ -224,7 +227,6 @@ public InMemoryClusterFixture() : base(3, config => config.WithActorRequestTimeo } } -// ReSharper disable once ClassNeverInstantiated.Global public class InMemoryClusterFixtureWithPartitionActivator : BaseInMemoryClusterFixture { public InMemoryClusterFixtureWithPartitionActivator() : base(3, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) @@ -271,4 +273,15 @@ protected override async Task SpawnClusterMember(Func config.WithActorRequestTimeout(TimeSpan.FromSeconds(4))) + { + } + + protected override IClusterProvider GetClusterProvider() => new SingleNodeProvider(); + + protected override IIdentityLookup GetIdentityLookup(string clusterName) => new SingleNodeLookup(); +} diff --git a/tests/Proto.Cluster.Tests/ClusterTests.cs b/tests/Proto.Cluster.Tests/ClusterTests.cs index 516f002143..f3cfc21e06 100644 --- a/tests/Proto.Cluster.Tests/ClusterTests.cs +++ b/tests/Proto.Cluster.Tests/ClusterTests.cs @@ -74,6 +74,12 @@ public async Task SupportsMessageEnvelopeResponses() [Fact] public async Task StateIsReplicatedAcrossCluster() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var sourceMember = Members.First(); var sourceMemberId = sourceMember.System.Id; var targetMember = Members.Last(); @@ -115,6 +121,12 @@ IAsyncEnumerable SubscribeToGossipUpdates(Cluster member) [Fact] public async Task ReSpawnsClusterActorsFromDifferentNodes() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var timeout = new CancellationTokenSource(10000).Token; var id = CreateIdentity("1"); await PingPong(Members[0], id, timeout); @@ -140,6 +152,12 @@ public async Task ReSpawnsClusterActorsFromDifferentNodes() [Fact] public async Task HandlesLosingANode() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var ids = Enumerable.Range(1, 10).Select(id => id.ToString()).ToList(); await CanGetResponseFromAllIdsOnAllNodes(ids, Members, 20000); @@ -158,6 +176,12 @@ public async Task HandlesLosingANode() [Fact] public async Task HandlesLosingANodeWhileProcessing() { + if (ClusterFixture.ClusterSize < 2) + { + _testOutputHelper.WriteLine("Skipped test, cluster size is less than 2"); + return; + } + var ingressNodes = new[] {Members[0], Members[1]}; var victim = Members[2]; var ids = Enumerable.Range(1, 20).Select(id => id.ToString()).ToList(); @@ -382,4 +406,13 @@ public InMemoryPartitionActivatorClusterTests(ITestOutputHelper testOutputHelper : base(testOutputHelper, clusterFixture) { } +} + +public class SingleNodeProviderClusterTests : ClusterTests, IClassFixture +{ + // ReSharper disable once SuggestBaseTypeForParameterInConstructor + public SingleNodeProviderClusterTests(ITestOutputHelper testOutputHelper, SingleNodeProviderFixture clusterFixture) + : base(testOutputHelper, clusterFixture) + { + } } \ No newline at end of file