Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make JoinAsync and JoinSeedNodesAsync more robust by checking cluster UP status #6033

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
407fb02
Make JoinAsync and JoinSeedNodesAsync more robust by using an async s…
Arkatufus Jul 5, 2022
81c2d9c
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 5, 2022
12f6c16
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 6, 2022
8addeaf
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Aaronontheweb Jul 6, 2022
ed586f2
Update how join state is being handled
Arkatufus Jul 14, 2022
e49465c
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 14, 2022
15ba469
Fix missing exception
Arkatufus Jul 14, 2022
dfea2e8
Merge branch 'cluster/fix_JoinAsync_and_JoinSeedNodesAsync' of github…
Arkatufus Jul 14, 2022
54e4d85
Update unit test
Arkatufus Jul 14, 2022
79d8f77
Update API Verify list
Arkatufus Jul 14, 2022
5bd8b66
update IsUp check
Arkatufus Jul 14, 2022
093ff5b
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 19, 2022
be804a9
Change IsUp implementation to check SelfMember instead
Arkatufus Jul 22, 2022
0f90d06
Remove state handling code
Arkatufus Jul 22, 2022
b5d6c59
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Arkatufus Jul 22, 2022
2b55eef
Remove spec
Arkatufus Jul 22, 2022
3b2472e
Merge branch 'cluster/fix_JoinAsync_and_JoinSeedNodesAsync' of github…
Arkatufus Jul 22, 2022
080256c
Revert state changes
Arkatufus Jul 22, 2022
10e6a8e
Merge branch 'dev' into cluster/fix_JoinAsync_and_JoinSeedNodesAsync
Aaronontheweb Jul 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public Akka.Cluster.IDowningProvider DowningProvider { get; }
public Akka.Remote.DefaultFailureDetectorRegistry<Akka.Actor.Address> FailureDetector { get; }
public bool IsTerminated { get; }
public bool IsUp { get; }
public Akka.Actor.Address SelfAddress { get; }
public Akka.Cluster.Member SelfMember { get; }
public System.Collections.Immutable.ImmutableHashSet<string> SelfRoles { get; }
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Akka.Util.Internal;
Expand Down
45 changes: 34 additions & 11 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,16 @@ public void Join(Address address)
/// <returns>Task which completes, once current cluster node reaches <see cref="MemberStatus.Up"/> state.</returns>
public Task JoinAsync(Address address, CancellationToken token = default)
{
if (_isTerminated)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
return Task.CompletedTask;
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

var timeout = Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10);
var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(timeout);
timeoutCts.CancelAfter(Settings.SeedNodeTimeout);
timeoutCts.Token.Register(() =>
{
timeoutCts.Dispose();
Expand All @@ -281,10 +286,10 @@ public Task JoinAsync(Address address, CancellationToken token = default)
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

Join(address);

return completion.Task.WithCancellation(token);
return completion.Task;
}

private Address FillLocal(Address address)
Expand Down Expand Up @@ -333,28 +338,35 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
/// </summary>
/// <param name="seedNodes">TBD</param>
/// <param name="token">TBD</param>
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default(CancellationToken))
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default)
{
if (_isTerminated)
throw new ClusterJoinFailedException("Cluster has already been terminated");

if (IsUp)
return Task.CompletedTask;

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);
Arkatufus marked this conversation as resolved.
Show resolved Hide resolved
var nodes = seedNodes.ToList();

var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(Settings.SeedNodeTimeout);
timeoutCts.Token.Register(() =>
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}."));
$"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", nodes)}."));
});

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

JoinSeedNodes(nodes);

JoinSeedNodes(seedNodes);

return completion.Task.WithCancellation(token);
return completion.Task;
}

/// <summary>
Expand Down Expand Up @@ -421,7 +433,10 @@ private Task LeaveSelf()
return leaveTask;

// Subscribe to MemberRemoved events
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null)));
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() =>
{
tcs.TrySetResult(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}));

// Send leave message
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress));
Expand Down Expand Up @@ -451,7 +466,10 @@ public void Down(Address address)
/// <param name="callback">The callback that is run whenever the current member achieves a status of <see cref="MemberStatus.Up"/></param>
public void RegisterOnMemberUp(Action callback)
{
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback));
if (IsUp)
callback();
else
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback));
}

/// <summary>
Expand Down Expand Up @@ -524,6 +542,11 @@ public ImmutableHashSet<string> SelfRoles
/// </summary>
public bool IsTerminated { get { return _isTerminated.Value; } }

/// <summary>
/// Determine whether the cluster is in the UP state.
/// </summary>
public bool IsUp => SelfMember.Status == MemberStatus.Up || SelfMember.Status == MemberStatus.WeaklyUp;

/// <summary>
/// The underlying <see cref="ActorSystem"/> supported by this plugin.
/// </summary>
Expand Down