Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce seednode discovery #1987

Merged
merged 2 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) =>

private Task OnActivationRequest(IContext context, ActivationRequest msg)
{


if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
if (Logger.IsEnabled(LogLevel.Debug))
Expand Down
10 changes: 10 additions & 0 deletions src/Proto.Cluster/Seed/ISeedNodeDiscovery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Tasks;

namespace Proto.Cluster.Seed;

public interface ISeedNodeDiscovery
{
Task Register(string memberId, string host, int port);
Task Remove(string memberId);
Task<(string memberId,string host, int port)[]> GetAll();
}
37 changes: 25 additions & 12 deletions src/Proto.Cluster/Seed/SeedNodeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class SeedNodeActor : IActor
private static readonly ILogger Logger = Log.CreateLogger<SeedNodeActor>();
private readonly SeedNodeClusterProviderOptions _options;
private ImmutableList<PID> _clients = ImmutableList<PID>.Empty;
private ClusterTopology? _lastestTopology;
private ClusterTopology? _latestTopology;
private ImmutableDictionary<string, Member> _members = ImmutableDictionary<string, Member>.Empty;

private SeedNodeActor(SeedNodeClusterProviderOptions options)
Expand All @@ -45,7 +45,22 @@ private async Task OnConnect(IContext context)
{
var (selfHost, selfPort) = context.System.GetAddress();

if (_options.SeedNodes.Except(new[] { (selfHost, selfPort) }).Any())
(string, int)[] seedNodes;

if (_options.Discovery != null)
{
var nodes = await _options.Discovery.GetAll().ConfigureAwait(false);
Logger.LogInformation("Starting via SeedNode Discovery, found seed nodes {@Members}", nodes);
seedNodes = nodes.Select(n => (n.host, n.port)).ToArray();

}
else
{
Logger.LogInformation("Starting via SeedNode, found seed nodes {@Members}", _options.SeedNodes);
seedNodes = _options.SeedNodes.Except(new[] { (selfHost, selfPort) }).ToArray();
}

if (seedNodes.Any())
{
foreach (var (host, port) in _options.SeedNodes)
{
Expand Down Expand Up @@ -84,12 +99,10 @@ private async Task OnConnect(IContext context)
}
}

private Task OnClusterTopology(IContext context, ClusterTopology clusterTopology)
private async Task OnClusterTopology(IContext context, ClusterTopology clusterTopology)
{
_lastestTopology = clusterTopology;
NotifyClients(context, clusterTopology);

return Task.CompletedTask;
_latestTopology = clusterTopology;
await NotifyClients(context, clusterTopology);
}

private Task OnClientTerminated(Terminated pid)
Expand Down Expand Up @@ -122,9 +135,9 @@ private Task OnJoinAsClientRequest(IContext context, JoinAsClientRequest request
_clients = _clients.Add(clientSeed);
context.Respond(new JoinResponse { Member = context.Cluster().MemberList.Self });

if (_lastestTopology != null)
if (_latestTopology != null)
{
context.Send(clientSeed, _lastestTopology);
context.Send(clientSeed, _latestTopology);
}
else
{
Expand All @@ -138,9 +151,9 @@ private Task OnJoinAsClientRequest(IContext context, JoinAsClientRequest request
_clients = _clients.Add(clientSeed);
context.Respond(new JoinResponse { Member = context.Cluster().MemberList.Self });

if (_lastestTopology != null)
if (_latestTopology != null)
{
context.Send(clientSeed, _lastestTopology);
context.Send(clientSeed, _latestTopology);
}
}

Expand All @@ -160,7 +173,7 @@ private Task OnJoinRequest(IContext context, JoinRequest request)
private Task OnTopologyUpdate(IContext context, GossipUpdate update)
{
var topology = update.Value.Unpack<ClusterTopology>();
_lastestTopology = topology;
_latestTopology = topology;

foreach (var m in topology.Members)
{
Expand Down
28 changes: 28 additions & 0 deletions src/Proto.Cluster/Seed/SeedNodeClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,37 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Proto.Cluster.Gossip;

namespace Proto.Cluster.Seed;

[PublicAPI]
public class SeedNodeClusterProvider : IClusterProvider
{
[PublicAPI]
public static IClusterProvider JoinSeedNode(string address, int port)
{
return new SeedNodeClusterProvider(new SeedNodeClusterProviderOptions((address, port)));
}

[PublicAPI]
public static IClusterProvider StartSeedNode()
{
return new SeedNodeClusterProvider();
}

public static async Task<IClusterProvider> StartSeedNodeAsync(ISeedNodeDiscovery discovery)
{
var options = new SeedNodeClusterProviderOptions(discovery);
var provider = new SeedNodeClusterProvider(options);
return provider;
}

private static readonly ILogger Logger = Log.CreateLogger<SeedNodeClusterProvider>();
private readonly CancellationTokenSource _cts = new();

Expand Down Expand Up @@ -56,6 +68,15 @@ public async Task StartMemberAsync(Cluster cluster)
default:
throw new Exception("Failed to join any seed node");
}

if (_options.Discovery != null)
{
var (selfHost, selfPort) = _cluster.System.GetAddress();

await _options.Discovery.Register(_cluster.System.Id, selfHost, selfPort);
Logger.LogInformation("Registering self in SeedNode Discovery {Id} {Host}:{Port}",
cluster.System.Id, selfHost, selfPort);
}
}

public async Task StartClientAsync(Cluster cluster)
Expand All @@ -81,6 +102,13 @@ public async Task ShutdownAsync(bool graceful)
{
await _cluster.System.Root.StopAsync(_pid).ConfigureAwait(false);
}
if (_options.Discovery is not null)
{
var (selfHost, selfPort) = _cluster.System.GetAddress();
await _options.Discovery.Remove(_cluster!.System.Id);
Logger.LogInformation("Removing self from SeedNode Discovery {Id} {Host}:{Port}",
_cluster.System.Id, selfHost, selfPort);
}

_cts.Cancel();
}
Expand Down
10 changes: 9 additions & 1 deletion src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ public record SeedNodeClusterProviderOptions
public SeedNodeClusterProviderOptions(params (string, int)[] seeds)
{
SeedNodes = seeds.ToImmutableList();
Discovery = null;
}

public SeedNodeClusterProviderOptions(ISeedNodeDiscovery discovery)
{
Discovery = discovery;
}

public ISeedNodeDiscovery? Discovery { get; }

public ImmutableList<(string host, int port)> SeedNodes { get; init; } =
public ImmutableList<(string host, int port)> SeedNodes { get; } =
ImmutableList<(string host, int port)>.Empty;
}