From 42385f60645c03aebf9a95f039f837ca61879784 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Tue, 9 May 2023 11:22:19 +0200 Subject: [PATCH 1/2] introduce seednode discovery --- .../Partition/PartitionPlacementActor.cs | 2 - src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs | 10 +++++ src/Proto.Cluster/Seed/SeedNodeActor.cs | 40 +++++++++++++------ .../Seed/SeedNodeClusterProvider.cs | 11 +++++ .../Seed/SeedNodeClusterProviderOptions.cs | 10 ++++- 5 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs diff --git a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs index 5d657edf13..cc544247b5 100644 --- a/src/Proto.Cluster/Partition/PartitionPlacementActor.cs +++ b/src/Proto.Cluster/Partition/PartitionPlacementActor.cs @@ -297,8 +297,6 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) => private Task OnActivationRequest(IContext context, ActivationRequest msg) { - - if (_actors.TryGetValue(msg.ClusterIdentity, out var existing)) { if (Logger.IsEnabled(LogLevel.Debug)) diff --git a/src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs b/src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs new file mode 100644 index 0000000000..d553e0065c --- /dev/null +++ b/src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; + +namespace Proto.Cluster.Seed; + +public interface ISeedNodeDiscovery +{ + Task Register(string memberId, string host, int port); + Task Remove(string memberId); + Task<(string memberId,string host, int port)[]> GetAll(); +} \ No newline at end of file diff --git a/src/Proto.Cluster/Seed/SeedNodeActor.cs b/src/Proto.Cluster/Seed/SeedNodeActor.cs index a16029b476..279e955101 100644 --- a/src/Proto.Cluster/Seed/SeedNodeActor.cs +++ b/src/Proto.Cluster/Seed/SeedNodeActor.cs @@ -20,7 +20,7 @@ public class SeedNodeActor : IActor private static readonly ILogger Logger = Log.CreateLogger(); private readonly SeedNodeClusterProviderOptions _options; private ImmutableList _clients = ImmutableList.Empty; - private ClusterTopology? _lastestTopology; + private ClusterTopology? _latestTopology; private ImmutableDictionary _members = ImmutableDictionary.Empty; private SeedNodeActor(SeedNodeClusterProviderOptions options) @@ -45,7 +45,25 @@ private async Task OnConnect(IContext context) { var (selfHost, selfPort) = context.System.GetAddress(); - if (_options.SeedNodes.Except(new[] { (selfHost, selfPort) }).Any()) + (string, int)[] seedNodes; + + if (_options.Discovery != null) + { + var nodes = await _options.Discovery.GetAll().ConfigureAwait(false); + Logger.LogInformation("Starting via SeedNode Discovery, found seed nodes {@Members}", nodes); + seedNodes = nodes.Select(n => (n.host, n.port)).ToArray(); + + await _options.Discovery.Register(context.System.Id, selfHost, selfPort); + Logger.LogInformation("Registering self in SeedNode Discovery {Id} {Host}:{Port}", context.System.Id, + selfHost, selfPort); + } + else + { + Logger.LogInformation("Starting via SeedNode, found seed nodes {@Members}", _options.SeedNodes); + seedNodes = _options.SeedNodes.Except(new[] { (selfHost, selfPort) }).ToArray(); + } + + if (seedNodes.Any()) { foreach (var (host, port) in _options.SeedNodes) { @@ -84,12 +102,10 @@ private async Task OnConnect(IContext context) } } - private Task OnClusterTopology(IContext context, ClusterTopology clusterTopology) + private async Task OnClusterTopology(IContext context, ClusterTopology clusterTopology) { - _lastestTopology = clusterTopology; - NotifyClients(context, clusterTopology); - - return Task.CompletedTask; + _latestTopology = clusterTopology; + await NotifyClients(context, clusterTopology); } private Task OnClientTerminated(Terminated pid) @@ -122,9 +138,9 @@ private Task OnJoinAsClientRequest(IContext context, JoinAsClientRequest request _clients = _clients.Add(clientSeed); context.Respond(new JoinResponse { Member = context.Cluster().MemberList.Self }); - if (_lastestTopology != null) + if (_latestTopology != null) { - context.Send(clientSeed, _lastestTopology); + context.Send(clientSeed, _latestTopology); } else { @@ -138,9 +154,9 @@ private Task OnJoinAsClientRequest(IContext context, JoinAsClientRequest request _clients = _clients.Add(clientSeed); context.Respond(new JoinResponse { Member = context.Cluster().MemberList.Self }); - if (_lastestTopology != null) + if (_latestTopology != null) { - context.Send(clientSeed, _lastestTopology); + context.Send(clientSeed, _latestTopology); } } @@ -160,7 +176,7 @@ private Task OnJoinRequest(IContext context, JoinRequest request) private Task OnTopologyUpdate(IContext context, GossipUpdate update) { var topology = update.Value.Unpack(); - _lastestTopology = topology; + _latestTopology = topology; foreach (var m in topology.Members) { diff --git a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs index 845562249f..b1d049b8c2 100644 --- a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs +++ b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs @@ -5,8 +5,10 @@ // ----------------------------------------------------------------------- using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; +using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Proto.Cluster.Gossip; @@ -14,16 +16,25 @@ namespace Proto.Cluster.Seed; public class SeedNodeClusterProvider : IClusterProvider { + [PublicAPI] public static IClusterProvider JoinSeedNode(string address, int port) { return new SeedNodeClusterProvider(new SeedNodeClusterProviderOptions((address, port))); } + [PublicAPI] public static IClusterProvider StartSeedNode() { return new SeedNodeClusterProvider(); } + public static async Task StartSeedNodeAsync(ISeedNodeDiscovery discovery) + { + var options = new SeedNodeClusterProviderOptions(discovery); + var provider = new SeedNodeClusterProvider(options); + return provider; + } + private static readonly ILogger Logger = Log.CreateLogger(); private readonly CancellationTokenSource _cts = new(); diff --git a/src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs b/src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs index 839c2dbf09..31677cd75d 100644 --- a/src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs +++ b/src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs @@ -13,8 +13,16 @@ public record SeedNodeClusterProviderOptions public SeedNodeClusterProviderOptions(params (string, int)[] seeds) { SeedNodes = seeds.ToImmutableList(); + Discovery = null; } + + public SeedNodeClusterProviderOptions(ISeedNodeDiscovery discovery) + { + Discovery = discovery; + } + + public ISeedNodeDiscovery? Discovery { get; } - public ImmutableList<(string host, int port)> SeedNodes { get; init; } = + public ImmutableList<(string host, int port)> SeedNodes { get; } = ImmutableList<(string host, int port)>.Empty; } \ No newline at end of file From 1cdaab3cd907b5029031850b4e84ecbc2cdab494 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Tue, 9 May 2023 11:37:46 +0200 Subject: [PATCH 2/2] register in discovery --- src/Proto.Cluster/Seed/SeedNodeActor.cs | 3 --- .../Seed/SeedNodeClusterProvider.cs | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Proto.Cluster/Seed/SeedNodeActor.cs b/src/Proto.Cluster/Seed/SeedNodeActor.cs index 279e955101..4900889279 100644 --- a/src/Proto.Cluster/Seed/SeedNodeActor.cs +++ b/src/Proto.Cluster/Seed/SeedNodeActor.cs @@ -53,9 +53,6 @@ private async Task OnConnect(IContext context) Logger.LogInformation("Starting via SeedNode Discovery, found seed nodes {@Members}", nodes); seedNodes = nodes.Select(n => (n.host, n.port)).ToArray(); - await _options.Discovery.Register(context.System.Id, selfHost, selfPort); - Logger.LogInformation("Registering self in SeedNode Discovery {Id} {Host}:{Port}", context.System.Id, - selfHost, selfPort); } else { diff --git a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs index b1d049b8c2..74e720f3b6 100644 --- a/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs +++ b/src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs @@ -14,6 +14,7 @@ namespace Proto.Cluster.Seed; +[PublicAPI] public class SeedNodeClusterProvider : IClusterProvider { [PublicAPI] @@ -67,6 +68,15 @@ public async Task StartMemberAsync(Cluster cluster) default: throw new Exception("Failed to join any seed node"); } + + if (_options.Discovery != null) + { + var (selfHost, selfPort) = _cluster.System.GetAddress(); + + await _options.Discovery.Register(_cluster.System.Id, selfHost, selfPort); + Logger.LogInformation("Registering self in SeedNode Discovery {Id} {Host}:{Port}", + cluster.System.Id, selfHost, selfPort); + } } public async Task StartClientAsync(Cluster cluster) @@ -92,6 +102,13 @@ public async Task ShutdownAsync(bool graceful) { await _cluster.System.Root.StopAsync(_pid).ConfigureAwait(false); } + if (_options.Discovery is not null) + { + var (selfHost, selfPort) = _cluster.System.GetAddress(); + await _options.Discovery.Remove(_cluster!.System.Id); + Logger.LogInformation("Removing self from SeedNode Discovery {Id} {Host}:{Port}", + _cluster.System.Id, selfHost, selfPort); + } _cts.Cancel(); }