Skip to content

Commit

Permalink
Add abortable lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiru Srikantha committed May 28, 2015
1 parent 3341bb7 commit 3a9c0ea
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 21 deletions.
41 changes: 37 additions & 4 deletions Consul.Test/LockTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void Lock_EphemeralAcquireRelease()
{
var c = ClientTest.MakeClient();
var s = c.Session.Create(new SessionEntry { Behavior = SessionBehavior.Delete });
using (var l = c.AcquireLock(new LockOptions("test/ephemerallock") { Session = s.Response }))
using (var l = c.AcquireLock(new LockOptions("test/ephemerallock") { Session = s.Response }, CancellationToken.None))
{
Assert.IsTrue(l.IsHeld);
c.Session.Destroy(s.Response);
Expand All @@ -96,14 +96,14 @@ public void Lock_EphemeralAcquireRelease()
[TestMethod]
public void Lock_AcquireWaitRelease()
{
var _lockOptions = new LockOptions("test/lock")
var lockOptions = new LockOptions("test/lock")
{
SessionName = "test_locksession",
SessionTTL = TimeSpan.FromSeconds(10)
};
var c = ClientTest.MakeClient();

var l = c.CreateLock(_lockOptions);
var l = c.CreateLock(lockOptions);

l.Acquire(CancellationToken.None);

Expand Down Expand Up @@ -243,7 +243,40 @@ public void Lock_RunAction()
});
}));
}

[TestMethod]
public void Lock_AbortAction()
{
var c = ClientTest.MakeClient();
using (var cts = new CancellationTokenSource())
{
try
{
string ls = c.Session.Create(new SessionEntry() { TTL = TimeSpan.FromSeconds(10) }).Response;
c.Session.RenewPeriodic(TimeSpan.FromSeconds(10), ls, cts.Token);
Task.Delay(1000).ContinueWith((t1) => { c.Session.Destroy(ls); });
c.ExecuteAbortableLocked(new LockOptions("test/lock") { Session = ls }, CancellationToken.None, () =>
{
Thread.Sleep(60000);
});
}
catch (TimeoutException ex)
{
Assert.IsInstanceOfType(ex, typeof(TimeoutException));
}
cts.Cancel();
}
using (var cts = new CancellationTokenSource())
{
string ls = c.Session.Create(new SessionEntry() { TTL = TimeSpan.FromSeconds(10) }).Response;
c.Session.RenewPeriodic(TimeSpan.FromSeconds(10), ls, cts.Token);
c.ExecuteAbortableLocked(new LockOptions("test/lock") { Session = ls }, CancellationToken.None, () =>
{
Thread.Sleep(1000);
Assert.IsTrue(true);
});
cts.Cancel();
}
}
[TestMethod]
public void Lock_ReclaimLock()
{
Expand Down
5 changes: 2 additions & 3 deletions Consul.Test/SemaphoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,14 @@ public void Semaphore_AcquireRelease()
[TestMethod]
public void Semaphore_AcquireWaitRelease()
{
const int Limit = 1;
var _semaphoreOptions = new SemaphoreOptions("test/semaphore", Limit)
var semaphoreOptions = new SemaphoreOptions("test/semaphore", 1)
{
SessionName = "test_semaphoresession",
SessionTTL = TimeSpan.FromSeconds(10)
};
var c = ClientTest.MakeClient();

var s = c.Semaphore(_semaphoreOptions);
var s = c.Semaphore(semaphoreOptions);

s.Acquire(CancellationToken.None);

Expand Down
192 changes: 178 additions & 14 deletions Consul/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,11 @@ private KVPair LockEntry(string session)
public class DisposableLock : Lock, IDisposable
{
public CancellationToken CancellationToken { get; private set; }
internal DisposableLock(Client client, LockOptions opts)
internal DisposableLock(Client client, LockOptions opts, CancellationToken ct)
: base(client)
{
Opts = opts;
CancellationToken = Acquire();
CancellationToken = Acquire(ct);
}

/// <summary>
Expand Down Expand Up @@ -573,6 +573,7 @@ public Lock CreateLock(LockOptions opts)
}
return new Lock(this) { Opts = opts };
}

/// <summary>
/// AcquireLock creates a lock that is already pre-acquired and implements IDisposable to be used in a "using" block
/// </summary>
Expand All @@ -584,53 +585,216 @@ public DisposableLock AcquireLock(string key)
{
throw new ArgumentNullException("key");
}
return AcquireLock(new LockOptions(key));
return AcquireLock(new LockOptions(key), CancellationToken.None);
}

/// <summary>
/// AcquireLock creates a lock that is already pre-acquired and implements IDisposable to be used in a "using" block
/// </summary>
/// <param name="key"></param>
/// <param name="ct"></param>
/// <returns></returns>
public DisposableLock AcquireLock(string key, CancellationToken ct)
{
if (string.IsNullOrEmpty(key))
{
throw new ArgumentNullException("key");
}
return AcquireLock(new LockOptions(key), ct);
}


/// <summary>
/// AcquireLock creates a lock that is already pre-acquired and implements IDisposable to be used in a "using" block
/// </summary>
/// <param name="opts"></param>
/// <param name="ct"></param>
/// <returns></returns>
public DisposableLock AcquireLock(LockOptions opts)
{
return new DisposableLock(this, opts);
if (opts == null)
{
throw new ArgumentNullException("opts");
}
return new DisposableLock(this, opts, CancellationToken.None);
}

/// <summary>
/// AcquireLock creates a lock that is already pre-acquired and implements IDisposable to be used in a "using" block
/// </summary>
/// <param name="opts"></param>
/// <param name="ct"></param>
/// <returns></returns>
public DisposableLock AcquireLock(LockOptions opts, CancellationToken ct)
{
if (opts == null)
{
throw new ArgumentNullException("opts");
}
return new DisposableLock(this, opts, ct);
}

/// <summary>
/// ExecuteLock accepts a delegate to execute in the context of a lock, releasing the lock when completed.
/// </summary>
/// <param name="key"></param>
/// <param name="action"></param>
/// <returns></returns>
public void ExecuteLocked(string key, Action action)
{
if (string.IsNullOrEmpty(key))
{
throw new ArgumentNullException("key");
}
ExecuteLocked(new LockOptions(key), CancellationToken.None, action);
}

/// <summary>
/// ExecuteLock accepts a delegate to execute in the context of a lock, releasing the lock when completed.
/// </summary>
/// <param name="opts"></param>
/// <param name="action"></param>
/// <returns></returns>
public void ExecuteLocked(LockOptions opts, Action action)
{
if (opts == null)
{
throw new ArgumentNullException("opts");
}
ExecuteLocked(opts, CancellationToken.None, action);
}
/// <summary>
/// ExecuteLock accepts a delegate to execute in the context of a lock, releasing the lock when completed.
/// </summary>
/// <param name="key"></param>
/// <param name="a"></param>
/// <param name="ct"></param>
/// <param name="action"></param>
/// <returns></returns>
public void ExecuteLocked(string key, Action a)
public void ExecuteLocked(string key, CancellationToken ct, Action action)
{
if (string.IsNullOrEmpty(key))
{
throw new ArgumentNullException("key");
}
ExecuteLocked(new LockOptions(key), a);
if (action == null)
{
throw new ArgumentNullException("action");
}
ExecuteLocked(new LockOptions(key), ct, action);
}

/// <summary>
/// ExecuteLock accepts a delegate to execute in the context of a lock, releasing the lock when completed.
/// </summary>
/// <param name="opts"></param>
/// <param name="a"></param>
/// <param name="ct"></param>
/// <param name="action"></param>
/// <returns></returns>
public void ExecuteLocked(LockOptions opts, Action a)
public void ExecuteLocked(LockOptions opts, CancellationToken ct, Action action)
{
if (opts == null)
{
throw new ArgumentNullException("opts");
}
if (a == null)
if (action == null)
{
throw new ArgumentNullException("a");
throw new ArgumentNullException("action");
}
using (var l = new DisposableLock(this, opts))
using (var l = AcquireLock(opts, ct))
{
if (l.IsHeld)
if (!l.IsHeld)
{
a();
throw new LockNotHeldException("Could not obtain the lock");
}
action();
}
}

/// <summary>
/// Do not use except unless you need this. Executes an action in a new thread under a lock, ABORTING THE THREAD if the lock is lost and the action does not complete within the lock-delay.
/// </summary>
/// <param name="key"></param>
/// <param name="action"></param>
public void ExecuteAbortableLocked(string key, Action action)
{
if (string.IsNullOrEmpty(key))
{
throw new ArgumentNullException("key");
}
if (action == null)
{
throw new ArgumentNullException("action");
}
ExecuteAbortableLocked(new LockOptions(key), CancellationToken.None, action);
}

/// <summary>
/// Do not use except unless you need this. Executes an action in a new thread under a lock, ABORTING THE THREAD if the lock is lost and the action does not complete within the lock-delay.
/// </summary>
/// <param name="opts"></param>
/// <param name="action"></param>
public void ExecuteAbortableLocked(LockOptions opts, Action action)
{
if (opts == null)
{
throw new ArgumentNullException("opts");
}
if (action == null)
{
throw new ArgumentNullException("action");
}
ExecuteAbortableLocked(opts, CancellationToken.None, action);
}

/// <summary>
/// Do not use except unless you need this. Executes an action in a new thread under a lock, ABORTING THE THREAD if the lock is lost and the action does not complete within the lock-delay.
/// </summary>
/// <param name="key"></param>
/// <param name="ct"></param>
/// <param name="action"></param>
public void ExecuteAbortableLocked(string key, CancellationToken ct, Action action)
{
if (string.IsNullOrEmpty(key))
{
throw new ArgumentNullException("key");
}
if (action == null)
{
throw new ArgumentNullException("action");
}
ExecuteAbortableLocked(new LockOptions(key), ct, action);
}

/// <summary>
/// Do not use except unless you need this. Executes an action in a new thread under a lock, ABORTING THE THREAD if the lock is lost and the action does not complete within the lock-delay.
/// </summary>
/// <param name="opts"></param>
/// <param name="ct"></param>
/// <param name="action"></param>
public void ExecuteAbortableLocked(LockOptions opts, CancellationToken ct, Action action)
{
using (var l = AcquireLock(opts, ct))
{
if (!l.IsHeld)
{
throw new LockNotHeldException("Could not obtain the lock");
}
var thread = new Thread(() => { action(); });
thread.Start();
while (!l.CancellationToken.IsCancellationRequested && thread.IsAlive) { }
if (!thread.IsAlive)
{
return;
}
var delayTask = Task.Delay(15000);
while (!delayTask.IsCompleted && thread.IsAlive) { }
if (!thread.IsAlive)
{
return;
}
// Now entering the "zone of danger"
thread.Abort();
throw new TimeoutException("Thread was aborted because the lock was lost and the action did not complete within the lock delay");
}
}
}
Expand Down

0 comments on commit 3a9c0ea

Please sign in to comment.