From b1eb68818dae0a95f9086676fdea837e4bb46df2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 28 Jul 2022 00:18:08 +0700 Subject: [PATCH] Make JoinAsync and JoinSeedNodesAsync more robust by checking cluster UP status (#6033) * Make JoinAsync and JoinSeedNodesAsync more robust by using an async state * Update how join state is being handled * Fix missing exception * Update unit test * Update API Verify list * update IsUp check * Change IsUp implementation to check SelfMember instead * Remove state handling code * Remove spec * Revert state changes Co-authored-by: Aaron Stannard --- ...reAPISpec.ApproveCluster.Core.verified.txt | 1 + ...APISpec.ApproveCluster.DotNet.verified.txt | 1 + ...oreAPISpec.ApproveCluster.Net.verified.txt | 1 + src/core/Akka.Cluster.Tests/ClusterSpec.cs | 1 + src/core/Akka.Cluster/Cluster.cs | 45 ++++++++++++++----- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt index f904b9eea13..4e1ea162b01 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Core.verified.txt @@ -29,6 +29,7 @@ namespace Akka.Cluster public Akka.Cluster.IDowningProvider DowningProvider { get; } public Akka.Remote.DefaultFailureDetectorRegistry FailureDetector { get; } public bool IsTerminated { get; } + public bool IsUp { get; } public Akka.Actor.Address SelfAddress { get; } public Akka.Cluster.Member SelfMember { get; } public System.Collections.Immutable.ImmutableHashSet SelfRoles { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt index d1d950052ac..6b738c2f7d5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt @@ -29,6 +29,7 @@ namespace Akka.Cluster public Akka.Cluster.IDowningProvider DowningProvider { get; } public Akka.Remote.DefaultFailureDetectorRegistry FailureDetector { get; } public bool IsTerminated { get; } + public bool IsUp { get; } public Akka.Actor.Address SelfAddress { get; } public Akka.Cluster.Member SelfMember { get; } public System.Collections.Immutable.ImmutableHashSet SelfRoles { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt index f904b9eea13..4e1ea162b01 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt @@ -29,6 +29,7 @@ namespace Akka.Cluster public Akka.Cluster.IDowningProvider DowningProvider { get; } public Akka.Remote.DefaultFailureDetectorRegistry FailureDetector { get; } public bool IsTerminated { get; } + public bool IsUp { get; } public Akka.Actor.Address SelfAddress { get; } public Akka.Cluster.Member SelfMember { get; } public System.Collections.Immutable.ImmutableHashSet SelfRoles { get; } diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index 2d29b71ee83..8f9a4bb6959 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -12,6 +12,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; +using Akka.Event; using Akka.TestKit; using Akka.TestKit.Extensions; using Akka.Util.Internal; diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 0da2a90b3c4..a6d2d980821 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -264,11 +264,16 @@ public void Join(Address address) /// Task which completes, once current cluster node reaches state. public Task JoinAsync(Address address, CancellationToken token = default) { + if (_isTerminated) + throw new ClusterJoinFailedException("Cluster has already been terminated"); + + if (IsUp) + return Task.CompletedTask; + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var timeout = Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10); var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); - timeoutCts.CancelAfter(timeout); + timeoutCts.CancelAfter(Settings.SeedNodeTimeout); timeoutCts.Token.Register(() => { timeoutCts.Dispose(); @@ -281,10 +286,10 @@ public Task JoinAsync(Address address, CancellationToken token = default) timeoutCts.Dispose(); completion.TrySetResult(NotUsed.Instance); }); - + Join(address); - return completion.Task.WithCancellation(token); + return completion.Task; } private Address FillLocal(Address address) @@ -333,9 +338,16 @@ public void JoinSeedNodes(IEnumerable
seedNodes) /// /// TBD /// TBD - public Task JoinSeedNodesAsync(IEnumerable
seedNodes, CancellationToken token = default(CancellationToken)) + public Task JoinSeedNodesAsync(IEnumerable
seedNodes, CancellationToken token = default) { + if (_isTerminated) + throw new ClusterJoinFailedException("Cluster has already been terminated"); + + if (IsUp) + return Task.CompletedTask; + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var nodes = seedNodes.ToList(); var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); timeoutCts.CancelAfter(Settings.SeedNodeTimeout); @@ -343,7 +355,7 @@ public void JoinSeedNodes(IEnumerable
seedNodes) { timeoutCts.Dispose(); completion.TrySetException(new ClusterJoinFailedException( - $"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}.")); + $"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", nodes)}.")); }); RegisterOnMemberUp(() => @@ -351,10 +363,10 @@ public void JoinSeedNodes(IEnumerable
seedNodes) timeoutCts.Dispose(); completion.TrySetResult(NotUsed.Instance); }); + + JoinSeedNodes(nodes); - JoinSeedNodes(seedNodes); - - return completion.Task.WithCancellation(token); + return completion.Task; } /// @@ -421,7 +433,10 @@ private Task LeaveSelf() return leaveTask; // Subscribe to MemberRemoved events - _clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null))); + _clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => + { + tcs.TrySetResult(null); + })); // Send leave message ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress)); @@ -451,7 +466,10 @@ public void Down(Address address) /// The callback that is run whenever the current member achieves a status of public void RegisterOnMemberUp(Action callback) { - _clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback)); + if (IsUp) + callback(); + else + _clusterDaemons.Tell(new InternalClusterAction.AddOnMemberUpListener(callback)); } /// @@ -524,6 +542,11 @@ public ImmutableHashSet SelfRoles /// public bool IsTerminated { get { return _isTerminated.Value; } } + /// + /// Determine whether the cluster is in the UP state. + /// + public bool IsUp => SelfMember.Status == MemberStatus.Up || SelfMember.Status == MemberStatus.WeaklyUp; + /// /// The underlying supported by this plugin. ///