Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up seed node process #4975

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 73 additions & 63 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,7 @@ internal sealed class JoinSeedNodeProcess : UntypedActor
private readonly ILoggingAdapter _log = Context.GetLogger();

private readonly ImmutableList<Address> _seeds;
private readonly ImmutableList<Address> _otherSeeds;
private readonly Address _selfAddress;
private int _attempts = 0;

Expand All @@ -2706,6 +2707,7 @@ public JoinSeedNodeProcess(ImmutableList<Address> seeds)
{
_selfAddress = Cluster.Get(Context.System).SelfAddress;
_seeds = seeds;
_otherSeeds = _seeds.Remove(_selfAddress);
if (seeds.IsEmpty || seeds.Head() == _selfAddress)
throw new ArgumentException("Join seed node should not be done");
Context.SetReceiveTimeout(Cluster.Get(Context.System).Settings.SeedNodeTimeout);
Expand All @@ -2725,46 +2727,53 @@ protected override void PreStart()
/// <param name="message">TBD</param>
protected override void OnReceive(object message)
{
if (message is InternalClusterAction.JoinSeenNode)
switch (message)
{
//send InitJoin to all seed nodes (except myself)
foreach (var path in _seeds.Where(x => x != _selfAddress)
.Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y))))
case InternalClusterAction.JoinSeenNode _:
{
path.Tell(new InternalClusterAction.InitJoin());
//send InitJoin to all seed nodes (except myself)
foreach (var path in _otherSeeds
.Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y))))
{
path.Tell(new InternalClusterAction.InitJoin());
}
_attempts++;
break;
}
_attempts++;
}
else if (message is InternalClusterAction.InitJoinAck)
{
//first InitJoinAck reply
var initJoinAck = (InternalClusterAction.InitJoinAck)message;
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Become(Done);
}
else if (message is InternalClusterAction.InitJoinNack) { } //that seed was uninitialized
else if (message is ReceiveTimeout)
{
if (_attempts >= 2)
_log.Warning(
"Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]",
_attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress))));
//no InitJoinAck received - try again
Self.Tell(new InternalClusterAction.JoinSeenNode());
}
else
{
Unhandled(message);
case InternalClusterAction.InitJoinAck initJoinAck:
//first InitJoinAck reply
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Become(Done);
break;
case InternalClusterAction.InitJoinNack _:
break; //that seed was uninitialized
case ReceiveTimeout _:
{
if (_attempts >= 2)
_log.Warning(
"Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]",
_attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress))));
//no InitJoinAck received - try again
Self.Tell(new InternalClusterAction.JoinSeenNode());
break;
}
default:
Unhandled(message);
break;
}
}

private void Done(object message)
{
if (message is InternalClusterAction.InitJoinAck)
switch (message)
{
//already received one, skip the rest
case InternalClusterAction.InitJoinAck _:
//already received one, skip the rest
break;
case ReceiveTimeout _:
Context.Stop(Self);
break;
}
else if (message is ReceiveTimeout) Context.Stop(Self);
}
}

Expand All @@ -2784,16 +2793,17 @@ internal sealed class FirstSeedNodeProcess : UntypedActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();

private readonly ImmutableList<Address> _seeds;
private ImmutableList<Address> _remainingSeeds;
private readonly Address _selfAddress;
private readonly Cluster _cluster;
private readonly Deadline _timeout;
private readonly ICancelable _retryTaskToken;

/// <summary>
/// TBD
/// Launches a new instance of the "first seed node" joining process.
/// </summary>
/// <param name="seeds">TBD</param>
/// <param name="seeds">The set of seed nodes to join.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when either the number of specified <paramref name="seeds"/> is less than or equal to 1
/// or the first listed seed is a reference to the <see cref="IActorContext.System">IUntypedActorContext.System</see>'s address.
Expand All @@ -2806,63 +2816,63 @@ public FirstSeedNodeProcess(ImmutableList<Address> seeds)
if (seeds.Count <= 1 || seeds.Head() != _selfAddress)
throw new ArgumentException("Join seed node should not be done");

_seeds = seeds;
_remainingSeeds = seeds.Remove(_selfAddress);
_timeout = Deadline.Now + _cluster.Settings.SeedNodeTimeout;
_retryTaskToken = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), Self, new InternalClusterAction.JoinSeenNode(), Self);
Self.Tell(new InternalClusterAction.JoinSeenNode());
}

/// <summary>
/// TBD
/// </summary>
protected override void PostStop()
{
_retryTaskToken.Cancel();
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
protected override void OnReceive(object message)
{
if (message is InternalClusterAction.JoinSeenNode)
switch (message)
{
if (_timeout.HasTimeLeft)
case InternalClusterAction.JoinSeenNode _ when _timeout.HasTimeLeft:
{
// send InitJoin to remaining seed nodes (except myself)
foreach (var seed in _remainingSeeds.Select(
x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x))))
x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x))))
seed.Tell(new InternalClusterAction.InitJoin());
break;
}
else
case InternalClusterAction.JoinSeenNode _:
{
if (_log.IsDebugEnabled)
{
_log.Debug("Couldn't join other seed nodes, will join myself. seed-nodes=[{0}]", string.Join(",", _seeds));
}
// no InitJoinAck received, initialize new cluster by joining myself
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
Context.Stop(Self);
break;
}
}
else if (message is InternalClusterAction.InitJoinAck)
{
// first InitJoinAck reply, join existing cluster
var initJoinAck = (InternalClusterAction.InitJoinAck)message;
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Stop(Self);
}
else if (message is InternalClusterAction.InitJoinNack)
{
var initJoinNack = (InternalClusterAction.InitJoinNack)message;
_remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address);
if (_remainingSeeds.IsEmpty)
{
// initialize new cluster by joining myself when nacks from all other seed nodes
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
case InternalClusterAction.InitJoinAck initJoinAck:
_log.Info("Received InitJoinAck message from [{0}] to [{1}]", initJoinAck.Address, _selfAddress);
// first InitJoinAck reply, join existing cluster
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Stop(Self);
break;
case InternalClusterAction.InitJoinNack initJoinNack:
{
_log.Info("Received InitJoinNack message from [{0}] to [{1}]", initJoinNack.Address, _selfAddress);
_remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address);
if (_remainingSeeds.IsEmpty)
{
// initialize new cluster by joining myself when nacks from all other seed nodes
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
Context.Stop(Self);
}

break;
}
}
else
{
Unhandled(message);
default:
Unhandled(message);
break;
}
}
}
Expand Down