-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster LeaveAsync() with CancellationToken support #2501
Changes from all commits
80a3692
27ab5f3
e2c1d4a
4ab859a
a8e2eb3
482d58f
b8bbed6
29c6012
0526f91
9f0008e
70d8759
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -265,7 +265,12 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes) | |
/// <param name="address">The address of the node leaving the cluster.</param> | ||
public void Leave(Address address) | ||
{ | ||
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address))); | ||
if (FillLocal(address) == SelfAddress) | ||
{ | ||
LeaveSelf(); | ||
} | ||
else | ||
ClusterCore.Tell(new ClusterUserAction.Leave(FillLocal(address))); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -274,20 +279,45 @@ public void Leave(Address address) | |
/// Once the returned <see cref="Task"/> completes, it means that the member has successfully been removed | ||
/// from the cluster. | ||
/// </summary> | ||
/// <returns>A <see cref="Task"/> that will return true upon the current node being removed from the cluster.</returns> | ||
/// <returns>A <see cref="Task"/> that will return upon the current node being removed from the cluster.</returns> | ||
public Task LeaveAsync() | ||
{ | ||
var tcs = _leaveTask.Value; | ||
return LeaveSelf(); | ||
} | ||
|
||
/// <summary> | ||
/// Causes the CURRENT node, i.e. the one calling this function, to leave the cluster. | ||
/// | ||
/// Once the returned <see cref="Task"/> completes in completed or cancelled state, it means that the member has successfully been removed | ||
/// from the cluster or cancellation token cancelled the task. | ||
/// </summary> | ||
/// <param name="cancellationToken">The cancellation token to cancel awaiting.</param> | ||
/// <returns>A <see cref="Task"/> that will return upon the current node being removed from the cluster, or if await was cancelled.</returns> | ||
/// <remarks> | ||
/// The cancellation token doesn't cancel leave from the cluster, it only lets to give up on awating (by timeout for example). | ||
/// </remarks> | ||
public Task LeaveAsync(CancellationToken cancellationToken) | ||
{ | ||
return LeaveSelf().WithCancellation(cancellationToken); | ||
} | ||
|
||
private Task _leaveTask; | ||
|
||
private Task LeaveSelf() | ||
{ | ||
var tcs = new TaskCompletionSource<object>(); | ||
var leaveTask = Interlocked.CompareExchange(ref _leaveTask, tcs.Task, null); | ||
|
||
// short-circuit - check to see if we've already successfully left. | ||
if (tcs.Task.IsCompleted) | ||
return tcs.Task; | ||
// It's assumed here that once the member left the cluster, it won't get back again. | ||
// So, the member removal event being memoized in TaskCompletionSource and never reset. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense |
||
if (leaveTask != null) | ||
return leaveTask; | ||
|
||
// Register it such that our TCS is automatically completed when we're removed | ||
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(true))); | ||
// Subscribe to MemberRemoved events | ||
_clusterDaemons.Tell(new InternalClusterAction.AddOnMemberRemovedListener(() => tcs.TrySetResult(null))); | ||
|
||
// Issue the leave command | ||
Leave(SelfAddress); | ||
// Send leave message | ||
ClusterCore.Tell(new ClusterUserAction.Leave(SelfAddress)); | ||
|
||
return tcs.Task; | ||
} | ||
|
@@ -403,7 +433,6 @@ public ImmutableHashSet<string> SelfRoles | |
/// </summary> | ||
public DefaultFailureDetectorRegistry<Address> FailureDetector { get { return _failureDetector; } } | ||
|
||
private Lazy<TaskCompletionSource<bool>> _leaveTask = new Lazy<TaskCompletionSource<bool>>(() => new TaskCompletionSource<bool>(), LazyThreadSafetyMode.ExecutionAndPublication); | ||
/// <summary> | ||
/// TBD | ||
/// </summary> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,14 @@ | |
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace Akka.Util.Internal | ||
{ | ||
/// <summary> | ||
/// INTERNAL API | ||
/// | ||
/// | ||
/// Extensions for working with <see cref="Task"/> types | ||
/// </summary> | ||
public static class TaskExtensions | ||
|
@@ -50,5 +51,26 @@ public static Task<TResult> CastTask<TTask, TResult>(this Task<TTask> task) | |
}, TaskContinuationOptions.ExecuteSynchronously); | ||
return tcs.Task; | ||
} | ||
|
||
/// <summary> | ||
/// Returns the task which completes with result of original task if cancellation token not canceled it before completion. | ||
/// </summary> | ||
/// <param name="task">The original task.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns>The task which completes with result of original task or with cancelled state.</returns> | ||
public static Task WithCancellation(this Task task, CancellationToken cancellationToken) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, I like it, although maybe we should mark this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Is the name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kostrse actually it's probably fine to leave this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are using [InternalVisibleToAttribute]. So, It does not matter that these classes are situated in different assemblies |
||
{ | ||
if (task.IsCompleted || !cancellationToken.CanBeCanceled) | ||
return task; | ||
|
||
var tcs = new TaskCompletionSource<object>(); | ||
var r = cancellationToken.Register(() => { tcs.SetCanceled(); }, false); | ||
|
||
return Task.WhenAny(task, tcs.Task) | ||
// Dispose subscription to cancellation token | ||
.ContinueWith(t => { r.Dispose(); }, TaskContinuationOptions.ExecuteSynchronously) | ||
// Check cancellation, to return task in cancelled state instead of completed | ||
.ContinueWith(t => { }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice solution