Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster LeaveAsync() with CancellationToken support #2501

Merged
merged 11 commits into from
Mar 25, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Akka.Cluster
public void JoinSeedNodes(System.Collections.Generic.IEnumerable<Akka.Actor.Address> seedNodes) { }
public void Leave(Akka.Actor.Address address) { }
public System.Threading.Tasks.Task LeaveAsync() { }
public System.Threading.Tasks.Task LeaveAsync(System.Threading.CancellationToken cancellationToken) { }
public void RegisterOnMemberRemoved(System.Action callback) { }
public void RegisterOnMemberUp(System.Action callback) { }
public Akka.Actor.ActorPath RemotePathOf(Akka.Actor.IActorRef actorRef) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5120,6 +5120,7 @@ namespace Akka.Util.Internal
public class static TaskExtensions
{
public static System.Threading.Tasks.Task<TResult> CastTask<TTask, TResult>(this System.Threading.Tasks.Task<TTask> task) { }
public static System.Threading.Tasks.Task WithCancellation(this System.Threading.Tasks.Task task, System.Threading.CancellationToken cancellationToken) { }
}
}
namespace Akka.Util.Internal.Collections
Expand Down
71 changes: 71 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
Expand Down Expand Up @@ -192,6 +193,76 @@ public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed()
_cluster.LeaveAsync().IsCompleted.Should().BeTrue();
}

[Fact]
public void A_cluster_must_return_completed_LeaveAsync_task_if_member_already_removed()
{
// Join cluster
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up

// Subscribe to MemberRemoved and wait for confirmation
_cluster.Subscribe(TestActor, typeof(ClusterEvent.MemberRemoved));
ExpectMsg<ClusterEvent.CurrentClusterState>();

// Leave the cluster prior to calling LeaveAsync()
_cluster.Leave(_selfAddress);

Within(TimeSpan.FromSeconds(10), () =>
{
LeaderActions(); // Leaving --> Exiting
LeaderActions(); // Exiting --> Removed

// Member should leave
ExpectMsg<ClusterEvent.MemberRemoved>().Member.Address.Should().Be(_selfAddress);
});

// LeaveAsync() task expected to complete immediately
_cluster.LeaveAsync().IsCompleted.Should().BeTrue();
}

[Fact]
public void A_cluster_must_cancel_LeaveAsync_task_if_CancellationToken_fired_before_node_left()
{
// Join cluster
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up

// Subscribe to MemberRemoved and wait for confirmation
_cluster.Subscribe(TestActor, typeof(ClusterEvent.MemberRemoved));
ExpectMsg<ClusterEvent.CurrentClusterState>();

// Requesting leave with cancellation token
var cts = new CancellationTokenSource();
var task1 = _cluster.LeaveAsync(cts.Token);

// Requesting another leave without cancellation
var task2 = _cluster.LeaveAsync(new CancellationTokenSource().Token);

// Cancelling the first task
cts.Cancel();
task1.Should(t => t.IsCanceled, "Task should be cancelled.");

Within(TimeSpan.FromSeconds(10), () =>
{
// Second task should continue awaiting for cluster leave
task2.IsCompleted.Should().BeFalse();

// Waiting for leave
LeaderActions(); // Leaving --> Exiting
LeaderActions(); // Exiting --> Removed

// Member should leave even a task was cancelled
ExpectMsg<ClusterEvent.MemberRemoved>().Member.Address.Should().Be(_selfAddress);

// Second task should complete (not cancelled)
task2.Should(t => t.IsCompleted && !t.IsCanceled, "Task should be completed, but not cancelled.");
});

// Subsequent LeaveAsync() tasks expected to complete immediately (not cancelled)
var task3 = _cluster.LeaveAsync();
task3.Should(t => t.IsCompleted && !t.IsCanceled, "Task should be completed, but not cancelled.");
}

[Fact]
public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address()
{
Expand Down
51 changes: 40 additions & 11 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
/// <param name="address">TBD</param>
public void Leave(Address address)
{
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address)));
if (FillLocal(address) == SelfAddress)
{
LeaveSelf();
}
else
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address)));
}

/// <summary>
Expand All @@ -273,20 +278,45 @@ public void Leave(Address address)
/// Once the returned <see cref="Task"/> completes, it means that the member has successfully been removed
/// from the cluster.
/// </summary>
/// <returns>A <see cref="Task"/> that will return true upon the current node being removed from the cluster.</returns>
/// <returns>A <see cref="Task"/> that will return upon the current node being removed from the cluster.</returns>
public Task LeaveAsync()
{
var tcs = _leaveTask.Value;
return LeaveSelf();
}

/// <summary>
/// Causes the CURRENT node, i.e. the one calling this function, to leave the cluster.
///
/// Once the returned <see cref="Task"/> completes in completed or cancelled state, it means that the member has successfully been removed
/// from the cluster or cancellation token cancelled the task.
/// </summary>
/// <param name="cancellationToken">The cancellation token to cancel awaiting.</param>
/// <returns>A <see cref="Task"/> that will return upon the current node being removed from the cluster, or if await was cancelled.</returns>
/// <remarks>
/// The cancellation token doesn't cancel leave from the cluster, it only lets to give up on awating (by timeout for example).
/// </remarks>
public Task LeaveAsync(CancellationToken cancellationToken)
{
return LeaveSelf().WithCancellation(cancellationToken);
}

private Task _leaveTask;

private Task LeaveSelf()
{
var tcs = new TaskCompletionSource<object>();
var leaveTask = Interlocked.CompareExchange(ref _leaveTask, tcs.Task, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice solution


// short-circuit - check to see if we've already successfully left.
if (tcs.Task.IsCompleted)
return tcs.Task;
// It's assumed here that once the member left the cluster, it won't get back again.
// So, the member removal event being memoized in TaskCompletionSource and never reset.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

if (leaveTask != null)
return leaveTask;

// Register it such that our TCS is automatically completed when we're removed
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(true)));
// Subscribe to MemberRemoved events
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null)));

// Issue the leave command
Leave(SelfAddress);
// Send leave message
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress));

return tcs.Task;
}
Expand Down Expand Up @@ -400,7 +430,6 @@ public ImmutableHashSet<string> SelfRoles
/// </summary>
public DefaultFailureDetectorRegistry<Address> FailureDetector { get { return _failureDetector; } }

private Lazy<TaskCompletionSource<bool>> _leaveTask = new Lazy<TaskCompletionSource<bool>>(() => new TaskCompletionSource<bool>(), LazyThreadSafetyMode.ExecutionAndPublication);
/// <summary>
/// TBD
/// </summary>
Expand Down
24 changes: 23 additions & 1 deletion src/core/Akka/Util/Internal/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Util.Internal
{
/// <summary>
/// TBD
/// Extension methods for operations on <see cref="Task"/> task types.
/// </summary>
public static class TaskExtensions
{
Expand Down Expand Up @@ -48,5 +49,26 @@ public static Task<TResult> CastTask<TTask, TResult>(this Task<TTask> task)
}, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}

/// <summary>
/// Returns the task which completes with result of original task if cancellation token not canceled it before completion.
/// </summary>
/// <param name="task">The original task.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The task which completes with result of original task or with cancelled state.</returns>
public static Task WithCancellation(this Task task, CancellationToken cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like it, although maybe we should mark this as internal? cc @alexvaluyskiy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TaskExtensions and Cluster are in different assemblies.
I found this extensions class and decided to add the WithCancellation() method here, but I can move it into another place if you know a better location.

Is the name WithCancellation sounds good, BTW?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kostrse actually it's probably fine to leave this public. WithCancellation sounds great 👍

Copy link
Contributor

@alexvaluyskiy alexvaluyskiy Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TaskExtensions and Cluster are in different assemblies.

We are using [InternalVisibleToAttribute]. So, It does not matter that these classes are situated in different assemblies

{
if (task.IsCompleted || !cancellationToken.CanBeCanceled)
return task;

var tcs = new TaskCompletionSource<object>();
var r = cancellationToken.Register(() => { tcs.SetCanceled(); }, false);

return Task.WhenAny(task, tcs.Task)
// Dispose subscription to cancellation token
.ContinueWith(t => { r.Dispose(); }, TaskContinuationOptions.ExecuteSynchronously)
// Check cancellation, to return task in cancelled state instead of completed
.ContinueWith(t => { }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
}