Skip to content

Commit

Permalink
clean up seed node process (#4975)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Apr 23, 2021
1 parent 321d0e4 commit d178b63
Showing 1 changed file with 73 additions and 63 deletions.
136 changes: 73 additions & 63 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,7 @@ internal sealed class JoinSeedNodeProcess : UntypedActor
private readonly ILoggingAdapter _log = Context.GetLogger();

private readonly ImmutableList<Address> _seeds;
private readonly ImmutableList<Address> _otherSeeds;
private readonly Address _selfAddress;
private int _attempts = 0;

Expand All @@ -2706,6 +2707,7 @@ public JoinSeedNodeProcess(ImmutableList<Address> seeds)
{
_selfAddress = Cluster.Get(Context.System).SelfAddress;
_seeds = seeds;
_otherSeeds = _seeds.Remove(_selfAddress);
if (seeds.IsEmpty || seeds.Head() == _selfAddress)
throw new ArgumentException("Join seed node should not be done");
Context.SetReceiveTimeout(Cluster.Get(Context.System).Settings.SeedNodeTimeout);
Expand All @@ -2725,46 +2727,53 @@ protected override void PreStart()
/// <param name="message">TBD</param>
protected override void OnReceive(object message)
{
if (message is InternalClusterAction.JoinSeenNode)
switch (message)
{
//send InitJoin to all seed nodes (except myself)
foreach (var path in _seeds.Where(x => x != _selfAddress)
.Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y))))
case InternalClusterAction.JoinSeenNode _:
{
path.Tell(new InternalClusterAction.InitJoin());
//send InitJoin to all seed nodes (except myself)
foreach (var path in _otherSeeds
.Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y))))
{
path.Tell(new InternalClusterAction.InitJoin());
}
_attempts++;
break;
}
_attempts++;
}
else if (message is InternalClusterAction.InitJoinAck)
{
//first InitJoinAck reply
var initJoinAck = (InternalClusterAction.InitJoinAck)message;
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Become(Done);
}
else if (message is InternalClusterAction.InitJoinNack) { } //that seed was uninitialized
else if (message is ReceiveTimeout)
{
if (_attempts >= 2)
_log.Warning(
"Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]",
_attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress))));
//no InitJoinAck received - try again
Self.Tell(new InternalClusterAction.JoinSeenNode());
}
else
{
Unhandled(message);
case InternalClusterAction.InitJoinAck initJoinAck:
//first InitJoinAck reply
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Become(Done);
break;
case InternalClusterAction.InitJoinNack _:
break; //that seed was uninitialized
case ReceiveTimeout _:
{
if (_attempts >= 2)
_log.Warning(
"Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]",
_attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress))));
//no InitJoinAck received - try again
Self.Tell(new InternalClusterAction.JoinSeenNode());
break;
}
default:
Unhandled(message);
break;
}
}

private void Done(object message)
{
if (message is InternalClusterAction.InitJoinAck)
switch (message)
{
//already received one, skip the rest
case InternalClusterAction.InitJoinAck _:
//already received one, skip the rest
break;
case ReceiveTimeout _:
Context.Stop(Self);
break;
}
else if (message is ReceiveTimeout) Context.Stop(Self);
}
}

Expand All @@ -2784,16 +2793,17 @@ internal sealed class FirstSeedNodeProcess : UntypedActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();

private readonly ImmutableList<Address> _seeds;
private ImmutableList<Address> _remainingSeeds;
private readonly Address _selfAddress;
private readonly Cluster _cluster;
private readonly Deadline _timeout;
private readonly ICancelable _retryTaskToken;

/// <summary>
/// TBD
/// Launches a new instance of the "first seed node" joining process.
/// </summary>
/// <param name="seeds">TBD</param>
/// <param name="seeds">The set of seed nodes to join.</param>
/// <exception cref="ArgumentException">
/// This exception is thrown when either the number of specified <paramref name="seeds"/> is less than or equal to 1
/// or the first listed seed is a reference to the <see cref="IActorContext.System">IUntypedActorContext.System</see>'s address.
Expand All @@ -2806,63 +2816,63 @@ public FirstSeedNodeProcess(ImmutableList<Address> seeds)
if (seeds.Count <= 1 || seeds.Head() != _selfAddress)
throw new ArgumentException("Join seed node should not be done");

_seeds = seeds;
_remainingSeeds = seeds.Remove(_selfAddress);
_timeout = Deadline.Now + _cluster.Settings.SeedNodeTimeout;
_retryTaskToken = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), Self, new InternalClusterAction.JoinSeenNode(), Self);
Self.Tell(new InternalClusterAction.JoinSeenNode());
}

/// <summary>
/// TBD
/// </summary>
protected override void PostStop()
{
_retryTaskToken.Cancel();
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
protected override void OnReceive(object message)
{
if (message is InternalClusterAction.JoinSeenNode)
switch (message)
{
if (_timeout.HasTimeLeft)
case InternalClusterAction.JoinSeenNode _ when _timeout.HasTimeLeft:
{
// send InitJoin to remaining seed nodes (except myself)
foreach (var seed in _remainingSeeds.Select(
x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x))))
x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x))))
seed.Tell(new InternalClusterAction.InitJoin());
break;
}
else
case InternalClusterAction.JoinSeenNode _:
{
if (_log.IsDebugEnabled)
{
_log.Debug("Couldn't join other seed nodes, will join myself. seed-nodes=[{0}]", string.Join(",", _seeds));
}
// no InitJoinAck received, initialize new cluster by joining myself
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
Context.Stop(Self);
break;
}
}
else if (message is InternalClusterAction.InitJoinAck)
{
// first InitJoinAck reply, join existing cluster
var initJoinAck = (InternalClusterAction.InitJoinAck)message;
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Stop(Self);
}
else if (message is InternalClusterAction.InitJoinNack)
{
var initJoinNack = (InternalClusterAction.InitJoinNack)message;
_remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address);
if (_remainingSeeds.IsEmpty)
{
// initialize new cluster by joining myself when nacks from all other seed nodes
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
case InternalClusterAction.InitJoinAck initJoinAck:
_log.Info("Received InitJoinAck message from [{0}] to [{1}]", initJoinAck.Address, _selfAddress);
// first InitJoinAck reply, join existing cluster
Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address));
Context.Stop(Self);
break;
case InternalClusterAction.InitJoinNack initJoinNack:
{
_log.Info("Received InitJoinNack message from [{0}] to [{1}]", initJoinNack.Address, _selfAddress);
_remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address);
if (_remainingSeeds.IsEmpty)
{
// initialize new cluster by joining myself when nacks from all other seed nodes
Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress));
Context.Stop(Self);
}

break;
}
}
else
{
Unhandled(message);
default:
Unhandled(message);
break;
}
}
}
Expand Down

0 comments on commit d178b63

Please sign in to comment.