diff --git a/rd-net/Lifetimes/Lifetimes.csproj b/rd-net/Lifetimes/Lifetimes.csproj
index f39dec2cb..bbd2fe26f 100644
--- a/rd-net/Lifetimes/Lifetimes.csproj
+++ b/rd-net/Lifetimes/Lifetimes.csproj
@@ -59,4 +59,8 @@
+
+
+
+
diff --git a/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs b/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs
index 9f64a7d6f..602c8959d 100644
--- a/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs
+++ b/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs
@@ -18,6 +18,9 @@ namespace JetBrains.Lifetimes
/// You can terminate this definition by method (or which is the same).
///
public class LifetimeDefinition : IDisposable
+#if !NET35
+ ,IAsyncDisposable
+#endif
{
#pragma warning disable 420
#region Statics
@@ -27,9 +30,10 @@ public class LifetimeDefinition : IDisposable
[PublicAPI] internal static readonly LifetimeDefinition Eternal = new LifetimeDefinition { Id = nameof(Eternal) };
[PublicAPI] internal static readonly LifetimeDefinition Terminated = new LifetimeDefinition { Id = nameof(Terminated) };
+ private static CancellationToken CancelledToken => new CancellationToken(canceled: true);
+
static LifetimeDefinition()
{
- Terminated.ToCancellationToken(); //to create cts
Terminated.Terminate();
}
@@ -90,6 +94,9 @@ private int ThreadLocalExecuting(int increment = 0)
private const int WaitForExecutingInTerminationTimeoutMsDefault = 500;
[PublicAPI] public static int WaitForExecutingInTerminationTimeoutMs = WaitForExecutingInTerminationTimeoutMsDefault;
+#if !NET35
+ [PublicAPI] public static int WaitForExecutingInAsyncTerminationTimeoutMs = WaitForExecutingInTerminationTimeoutMsDefault;
+#endif
// use real (sealed) types to allow devirtualization
private static readonly IntBitSlice ourExecutingSlice = BitSlice.Int(20);
@@ -359,6 +366,9 @@ private bool IncrementStatusIfEqualsTo(LifetimeStatus status)
#region Termination
public void Dispose() => Terminate();
+#if !NET35
+ public ValueTask DisposeAsync() => TerminateAsync();
+#endif
[Obsolete("Use `Lifetime.IsAlive` or `Status` field instead")]
public bool IsTerminated => Status >= LifetimeStatus.Terminating;
@@ -392,26 +402,94 @@ public void Terminate()
if (ourExecutingSlice[myState] > 0 /*optimization*/ && !SpinWait.SpinUntil(() => ourExecutingSlice[myState] <= ThreadLocalExecuting(), WaitForExecutingInTerminationTimeoutMs))
{
- Log.Warn($"{this}: can't wait for `ExecuteIfAlive` completed on other thread in {WaitForExecutingInTerminationTimeoutMs} ms. Keep termination." + Environment.NewLine
- + "This may happen either because of the ExecuteIfAlive failed to complete in a timely manner. In the case there will be following error messages." + Environment.NewLine
- + "Or this might happen because of garbage collection or when the thread yielded execution in SpinWait.SpinOnce but did not receive execution back in a timely manner. If you are on JetBrains' Slack see the discussion https://jetbrains.slack.com/archives/CAZEUK2R0/p1606236742208100");
-
- ourLogErrorAfterExecution.InterlockedUpdate(ref myState, true);
+ ErrorAfterExecutions();
}
if (!IncrementStatusIfEqualsTo(LifetimeStatus.Canceling))
return;
+
+ DisposeCtsOrExecutionsAwaiter();
Diagnostics(nameof(LifetimeStatus.Terminating));
//Now status is 'Terminating' and we have to wait for all resource modifications to complete. No mutex acquire is possible beyond this point.
if (ourMutexSlice[myState]) //optimization
SpinWaitEx.SpinUntil(() => !ourMutexSlice[myState]);
-
- Destruct();
+
+ Destruct();
Assertion.Assert(Status == LifetimeStatus.Terminated, "{0}: bad status for termination finish", this);
Diagnostics(nameof(LifetimeStatus.Terminated));
}
+#if !NET35
+ ///
+ /// Asynchronously waits for all before terminating
+ /// All nested lifetimes and will be terminated asynchronously
+ ///
+ ///
+ /// if called under
+ [PublicAPI]
+ public ValueTask TerminateAsync()
+ {
+ if (IsEternal || Status > LifetimeStatus.Canceling)
+ return new ValueTask();
+
+ Diagnostics(nameof(LifetimeStatus.Canceling));
+
+ //parent could ask for canceled already
+ MarkCancelingRecursively();
+
+ var supportsTerminationUnderExecuting = AllowTerminationUnderExecution;
+ if (!supportsTerminationUnderExecuting && ThreadLocalExecuting() > 0)
+ throw new InvalidOperationException($"{this}: can't terminate under `ExecuteIfAlive` because termination doesn't support this. Use `{nameof(AllowTerminationUnderExecution)}`.");
+
+ return TerminateAsync(supportsTerminationUnderExecuting);
+ }
+
+ private async ValueTask TerminateAsync(bool supportsTerminationUnderExecuting)
+ {
+ if (!supportsTerminationUnderExecuting && ourExecutingSlice[myState] > 0)
+ {
+ var value = myCtsOrExecutionsAwaiter;
+ if (value is Disposed || value is ExecutionsAwaiter || Status >= LifetimeStatus.Terminating)
+ return; // termination already started
+
+ var awaiter = new ExecutionsAwaiter();
+ while (true)
+ {
+ if (value is Disposed || value is ExecutionsAwaiter || Status >= LifetimeStatus.Terminating)
+ return; // termination already started
+
+ var originalValue = Interlocked.CompareExchange(ref myCtsOrExecutionsAwaiter, awaiter, value);
+ if (originalValue == value) break;
+
+ value = originalValue;
+ }
+
+ if (value is CancellationTokenSource source)
+ source.Cancel();
+
+ if (ourExecutingSlice[myState] > 0)
+ {
+ var succeeded = await awaiter.WaitAsync(WaitForExecutingInAsyncTerminationTimeoutMs);
+ if (!succeeded) ErrorAfterExecutions();
+ }
+ }
+
+ if (!IncrementStatusIfEqualsTo(LifetimeStatus.Canceling))
+ return;
+
+ DisposeCtsOrExecutionsAwaiter();
+
+ Diagnostics(nameof(LifetimeStatus.Terminating));
+ //Now status is 'Terminating' and we have to wait for all resource modifications to complete. No mutex acquire is possible beyond this point.
+ if (ourMutexSlice[myState]) //optimization
+ SpinWaitEx.SpinUntil(() => !ourMutexSlice[myState]);
+
+ await DestructAsync();
+ Assertion.Assert(Status == LifetimeStatus.Terminated, "{0}: bad status for termination finish", this);
+ Diagnostics(nameof(LifetimeStatus.Terminated));
+ }
+#endif
private void MarkCancelingRecursively()
{
@@ -419,8 +497,8 @@ private void MarkCancelingRecursively()
if (!IncrementStatusIfEqualsTo(LifetimeStatus.Alive))
return;
-
- myCts?.Cancel();
+
+ (myCtsOrExecutionsAwaiter as CancellationTokenSource)?.Cancel();
// Some other thread can already begin destructuring
// Then children lifetimes become canceled in their termination
@@ -436,8 +514,76 @@ private void MarkCancelingRecursively()
}
}
+ private void ErrorAfterExecutions()
+ {
+ ourLogErrorAfterExecution.InterlockedUpdate(ref myState, true);
+
+ Log.Warn($"{this}: can't wait for `ExecuteIfAlive` completed on other thread in {WaitForExecutingInTerminationTimeoutMs} ms. Keep termination." + Environment.NewLine
+ + "This may happen either because of the ExecuteIfAlive failed to complete in a timely manner. In the case there will be following error messages." + Environment.NewLine
+ + "Or this might happen because of garbage collection or when the thread yielded execution in SpinWait.SpinOnce but did not receive execution back in a timely manner. If you are on JetBrains' Slack see the discussion https://jetbrains.slack.com/archives/CAZEUK2R0/p1606236742208100");
+ }
+
#if !NET35
[System.Runtime.ExceptionServices.HandleProcessCorruptedStateExceptions]
+ private async ValueTask DestructAsync()
+ {
+ var status = Status;
+ Assertion.Assert(status == LifetimeStatus.Terminating, "{0}: bad status for destructuring start", this);
+ Assertion.Assert(ourMutexSlice[myState] == false, "{0}: mutex must be released in this point", this);
+ //no one can take mutex after this point
+
+ var resources = myResources;
+ Assertion.AssertNotNull(resources, "{0}: `resources` can't be null on destructuring stage", this);
+
+ Assertion.Assert(myCtsOrExecutionsAwaiter == Disposed.Instance, "myCtsOrExecutionsAwaiter == DisposedAwaiter.Instance");
+
+ for (var i = myResCount - 1; i >= 0; i--)
+ {
+ try
+ {
+ switch (resources[i])
+ {
+ case Action a:
+ a();
+ break;
+
+ case LifetimeDefinition ld:
+ await ld.TerminateAsync();
+ break;
+
+ case IAsyncDisposable ad:
+ await ad.DisposeAsync();
+ break;
+
+ case IDisposable d:
+ d.Dispose();
+ break;
+
+ case ITerminationHandler th:
+ th.OnTermination(Lifetime);
+ break;
+
+ default:
+ Log.Error("{0}: unknown type of termination resource: {1}", this, resources[i]);
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ Log.Error(e, $"{this}: exception on termination of resource[{i}]: ${resources[i]}");
+ }
+ }
+
+ myResources = null;
+ myResCount = 0;
+
+ var statusIncrementedSuccessfully = IncrementStatusIfEqualsTo(LifetimeStatus.Terminating);
+ Assertion.Assert(statusIncrementedSuccessfully, "{0}: bad status for destructuring finish", this);
+ }
+#endif
+
+ #if !NET35
+ [System.Runtime.ExceptionServices.HandleProcessCorruptedStateExceptions]
#endif
private void Destruct()
{
@@ -449,6 +595,8 @@ private void Destruct()
var resources = myResources;
Assertion.AssertNotNull(resources, "{0}: `resources` can't be null on destructuring stage", this);
+ Assertion.Assert(myCtsOrExecutionsAwaiter == Disposed.Instance, "myCtsOrExecutionsAwaiter == DisposedAwaiter.Instance");
+
for (var i = myResCount - 1; i >= 0; i--)
{
try
@@ -485,15 +633,26 @@ private void Destruct()
myResources = null;
myResCount = 0;
- //In fact we shouldn't make cts null, because it should provide stable CancellationToken to finish enclosing tasks in Canceled state (not Faulted)
- //But to avoid memory leaks we must do it. So if you 1) run task with alive lifetime 2) terminate lifetime 3) in task invoke ThrowIfNotAlive() you can obtain `Faulted` state rather than `Canceled`. But it doesn't matter in `async-await` programming.
- if (!ReferenceEquals(this, Terminated))
- myCts = null;
-
var statusIncrementedSuccessfully = IncrementStatusIfEqualsTo(LifetimeStatus.Terminating);
Assertion.Assert(statusIncrementedSuccessfully, "{0}: bad status for destructuring finish", this);
}
+ private void DisposeCtsOrExecutionsAwaiter()
+ {
+ Assertion.Assert(myCtsOrExecutionsAwaiter != Disposed.Instance, "myCtsOrExecutionsAwaiter != Disposed.Instance");
+
+ var originValue = Interlocked.Exchange(ref myCtsOrExecutionsAwaiter, Disposed.Instance);
+ if (originValue is CancellationTokenSource source) source.Cancel();
+#if !NET35
+ else if (originValue is ExecutionsAwaiter awaiter) awaiter.TryFire();
+#endif
+ }
+
+ private class Disposed
+ {
+ public static readonly Disposed Instance = new Disposed();
+ }
+
@@ -662,10 +821,40 @@ private static Result WrapOrThrow([NotNull] Func action, bool wrap)
return wrap ? Result.Wrap(action) : Result.Success(action());
}
+ private class ExecutionsAwaiter
+ {
+#if !NET35
+ private readonly TaskCompletionSource myTcs;
+
+ public ExecutionsAwaiter()
+ {
+ myTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ }
+
+ public Task WaitAsync(int timeoutMs)
+ {
+ var task = myTcs.Task;
+ if (task.IsCompleted)
+ return task;
+
+ var timer = new System.Timers.Timer(timeoutMs) {AutoReset = false};
+ timer.Elapsed += (_, __) => myTcs.TrySetResult(false);
+ timer.Start();
+
+ task.ContinueWith(t => timer.Dispose(), TaskScheduler.Default);
+ return task;
+ }
+
+ public bool TryFire() => myTcs.TrySetResult(true) || myTcs.Task.Result;
+#else
+ public ExecutionsAwaiter() => throw new NotSupportedException("Must not be created for NET35");
+#endif
+ }
+
///
/// Must be used only by
///
- public struct ExecuteIfAliveCookie : IDisposable
+ public readonly struct ExecuteIfAliveCookie : IDisposable
{
[NotNull]
private readonly LifetimeDefinition myDef;
@@ -708,7 +897,7 @@ public void Dispose()
if (!Succeed)
return;
- Interlocked.Decrement(ref myDef.myState);
+ var state = Interlocked.Decrement(ref myDef.myState);
if (!myDisableIncrementThreadLocalExecuting)
myDef.ThreadLocalExecuting(-1);
@@ -716,10 +905,16 @@ public void Dispose()
if (myAllowTerminationUnderExecuting)
ourAllowTerminationUnderExecutionThreadStatic--;
- if (ourLogErrorAfterExecution[myDef.myState])
+ var shouldLogError = ourLogErrorAfterExecution[state];
+#if !NET35
+ if (ourExecutingSlice[state] == 0 && Memory.VolatileRead(ref myDef.myCtsOrExecutionsAwaiter) is ExecutionsAwaiter awaiter)
+ shouldLogError = !awaiter.TryFire() || shouldLogError;
+#endif
+
+ if (shouldLogError)
{
Log.Error($"ExecuteIfAlive after termination of {myDef} took too much time (>{WaitForExecutingInTerminationTimeoutMs}ms)");
- }
+ }
}
}
@@ -897,7 +1092,7 @@ internal T Bracket([NotNull] Func opening, [NotNull] Action closing)
#region Cancellation
- private CancellationTokenSource myCts;
+ private object myCtsOrExecutionsAwaiter;
//Only if state >= Canceling
@@ -907,18 +1102,26 @@ internal T Bracket([NotNull] Func opening, [NotNull] Action closing)
private Result CanceledResult() => Result.Canceled(CanceledException());
- private void CreateCtsLazily()
+ private CancellationToken CreateCancellationToken()
{
- if (myCts != null) return;
-
+ Assertion.Assert(!ReferenceEquals(this, Terminated), "Mustn't reach this point on lifetime `Terminated`");
+
var cts = new CancellationTokenSource();
- Memory.Barrier();
- //to suppress reordering of init and ctor visible from outside
- myCts = cts;
- //But MarkCanceledRecursively may already happen, so we need to help Cancel source
- if (Status != LifetimeStatus.Alive)
- myCts.Cancel();
+ var originalValue = Interlocked.CompareExchange(ref myCtsOrExecutionsAwaiter, cts, null);
+ if (originalValue is CancellationTokenSource source)
+ {
+ cts.Cancel();
+ return source.Token;
+ }
+
+ if (originalValue != null || Status != LifetimeStatus.Alive)
+ {
+ cts.Cancel();
+ return CancelledToken;
+ }
+
+ return cts.Token;
}
///
@@ -933,24 +1136,13 @@ public void ThrowIfNotAlive()
internal CancellationToken ToCancellationToken(bool doNotCreateCts = false)
{
- if (myCts == null)
- {
- if (doNotCreateCts)
- return Terminated.ToCancellationToken();
-
- using (var mutex = new UnderMutexCookie(this, LifetimeStatus.Alive))
- {
- if (!mutex.Success)
- {
- Assertion.Assert(!ReferenceEquals(this, Terminated), "Mustn't reach this point on lifetime `Terminated`");
- return Terminated.ToCancellationToken(); //to get stable CancellationTokenSource (for tasks to finish in Canceling state, rather than Faulted)
- }
-
- CreateCtsLazily();
- }
- }
-
- return myCts.Token;
+ if (myCtsOrExecutionsAwaiter is CancellationTokenSource source)
+ return source.Token;
+
+ if (doNotCreateCts || Status != LifetimeStatus.Alive)
+ return CancelledToken;
+
+ return CreateCancellationToken();
}
diff --git a/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs b/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs
index 08be2f43b..fc20aac9f 100644
--- a/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs
+++ b/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs
@@ -3,11 +3,13 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using JetBrains.Collections.Viewable;
using JetBrains.Core;
using JetBrains.Diagnostics;
using JetBrains.Diagnostics.Internal;
using JetBrains.Lifetimes;
using JetBrains.Threading;
+using JetBrains.Util.Internal;
using NUnit.Framework;
namespace Test.Lifetimes.Lifetimes
@@ -257,6 +259,62 @@ void LoggerHandler(LeveledMessage message)
Assert.IsTrue(warningReceived, "Warning `{0}` must have been logged", expectedWarningText);
Assert.IsTrue(exceptionReceived, "Exception `{0}` must have been logged", expectedExceptionText);
}
+
+#if !NET35
+ [Test]
+ public void TestLongTryExecuteAsync()
+ {
+ const string expectedWarningText = "can't wait for `ExecuteIfAlive` completed on other thread";
+ const string expectedExceptionText = "ExecuteIfAlive after termination of";
+ bool warningReceived = false, exceptionReceived = false;
+
+ Lifetime.Using(lifetime =>
+ {
+ void LoggerHandler(LeveledMessage message)
+ {
+ if (message.Level == LoggingLevel.WARN && message.FormattedMessage.Contains(expectedWarningText))
+ warningReceived = true;
+ }
+
+ lifetime.Bracket(
+ () => TestLogger.ExceptionLogger.Handlers += LoggerHandler,
+ () => TestLogger.ExceptionLogger.Handlers -= LoggerHandler
+ );
+
+ var lifetimeDefinition = lifetime.CreateNested();
+ var lifetimeTerminatedEvent = new ManualResetEvent(false);
+ var backgroundThreadIsInTryExecuteEvent = new ManualResetEvent(false);
+ var thread = new Thread(() => lifetimeDefinition.Lifetime.TryExecute(() =>
+ {
+ backgroundThreadIsInTryExecuteEvent.Set();
+ lifetimeTerminatedEvent.WaitOne();
+ }));
+ thread.Start();
+ backgroundThreadIsInTryExecuteEvent.WaitOne();
+
+ var terminationTask = lifetimeDefinition.TerminateAsync();
+ SpinWait.SpinUntil(() => terminationTask.IsCompleted);
+
+ lifetimeTerminatedEvent.Set();
+ thread.Join();
+ try
+ {
+ TestLogger.ExceptionLogger.ThrowLoggedExceptions();
+ }
+ catch (Exception e)
+ {
+ if (!e.Message.Contains(expectedExceptionText))
+ throw;
+
+ exceptionReceived = true;
+ }
+ });
+
+ Assert.IsTrue(warningReceived, "Warning `{0}` must have been logged", expectedWarningText);
+ Assert.IsTrue(exceptionReceived, "Exception `{0}` must have been logged", expectedExceptionText);
+ }
+#endif
+
[Test]
public void TestBracketGood()
@@ -1171,6 +1229,547 @@ public void TestTerminatesAfter()
Thread.Sleep(200);
Assert.True(lf.IsNotAlive);
}
+
+ [Test]
+ public void DoubleAsyncTerminationTest()
+ {
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ definition.Terminate();
+ Assert.AreEqual(1, count);
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ definition.Terminate();
+ Assert.AreEqual(1, count);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.AreEqual(1, count);
+ definition.Terminate();
+ definition.Terminate();
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.AreEqual(1, count);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+ var task = TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ e1.WaitOne();
+ var t = definition.TerminateAsync();
+ Assert.IsFalse(t.IsCompleted);
+
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ e2.Set();
+
+ Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
+ Assert.IsTrue(t.AsTask().Wait(TimeSpan.FromSeconds(10)));
+
+ Assert.AreEqual(1, count);
+ }
+ }
+
+ [Test]
+ public void ConcurrentDoubleAsyncTermination()
+ {
+ for (var i = 0; i < 10000; i++)
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+
+ const int threadsCount = 10;
+ var threads = 0;
+ var tasks = Enumerable.Range(0, threadsCount).Select(_ => TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ Interlocked.Increment(ref threads);
+ SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount); // sync threads
+
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ })).ToArray();
+
+ Assert.IsTrue(Task.WaitAll(tasks, TimeSpan.FromMinutes(1)));
+ Assert.AreEqual(1, count);
+ }
+ }
+
+ [Test]
+ public void SimpleTerminateAsyncTest()
+ {
+ {
+ var definition = new LifetimeDefinition();
+ var currentThread = Thread.CurrentThread;
+ var called = false;
+ definition.Lifetime.OnTermination(() =>
+ {
+ Assert.AreEqual(currentThread, Thread.CurrentThread);
+ called = true;
+ });
+
+ var task = definition.TerminateAsync();
+ Assert.IsTrue(task.IsCompletedSuccessfully);
+ Assert.IsTrue(called);
+ }
+
+ {
+ var scheduler = SingleThreadScheduler.RunOnSeparateThread(TestLifetime, "TestScheduler");
+
+ var definition = new LifetimeDefinition();
+ var called = false;
+ definition.Lifetime.OnTermination(() =>
+ {
+ scheduler.AssertThread();
+ called = true;
+ });
+
+ Task task;
+ var e1 = new SemaphoreSlim(0);
+ var e2 = new SemaphoreSlim(0);
+ using (var cookie = definition.Lifetime.UsingExecuteIfAlive())
+ {
+ Assert.IsTrue(cookie.Succeed);
+ task = TestLifetime.StartAsync(scheduler, async () =>
+ {
+ scheduler.AssertThread();
+ var task = definition.TerminateAsync();
+ Assert.IsFalse(task.IsCompletedSuccessfully);
+ Assert.IsFalse(called);
+
+ scheduler.Queue(() => e1.Release());
+ await e2.WaitAsync();
+
+ scheduler.AssertThread();
+ Assert.IsTrue(task.IsCompletedSuccessfully);
+ Assert.IsTrue(called);
+ });
+
+ e1.Wait();
+ }
+
+ e2.Release();
+ Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
+
+ Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
+ Assert.IsTrue(called);
+ }
+ }
+
+ [Test]
+ public void SimpleNestedAsyncTermination()
+ {
+ var def1 = new LifetimeDefinition();
+ var def2 = def1.Lifetime.CreateNested();
+
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+
+ TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (var cookie = def2.UsingExecuteIfAlive())
+ {
+ Assert.IsTrue(cookie.Succeed);
+
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ e1.WaitOne();
+
+ var task = def1.TerminateAsync();
+ Assert.IsFalse(task.IsCompleted);
+ Assert.AreEqual(LifetimeStatus.Terminating, def1.Status);
+ Assert.AreEqual(LifetimeStatus.Canceling, def2.Status);
+
+ e2.Set();
+
+ Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10)));
+
+ Assert.IsTrue(task.IsCompletedSuccessfully);
+ Assert.AreEqual(LifetimeStatus.Terminated, def1.Status);
+ Assert.AreEqual(LifetimeStatus.Terminated, def2.Status);
+ }
+
+ [Test]
+ public void AsyncDisposableTerminationTest()
+ {
+ var disposable = new TestAsyncDisposable();
+ var definition = new LifetimeDefinition();
+ definition.Lifetime.OnTermination(disposable);
+
+ var scheduler = new SequentialScheduler("TestScheduler", TestLifetime);
+ var task = TestLifetime.StartAsync(scheduler, async () =>
+ {
+ var t = definition.TerminateAsync();
+ Assert.IsFalse(t.IsCompleted);
+ Assert.AreEqual(LifetimeStatus.Terminating, definition.Status);
+
+ Assert.IsTrue(disposable.Disposing);
+ Assert.IsFalse(disposable.Disposed);
+
+ await Task.Yield();
+
+ Assert.IsTrue(disposable.Disposed);
+ Assert.IsTrue(t.IsCompletedSuccessfully);
+ Assert.AreEqual(LifetimeStatus.Terminated, definition.Status);
+ });
+
+ Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
+ Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
+ Assert.AreEqual(LifetimeStatus.Terminated, definition.Status);
+ }
+
+ [Test]
+ public void AllowAsyncTerminationTest()
+ {
+ var definition = new LifetimeDefinition {AllowTerminationUnderExecution = true};
+ var called = false;
+ definition.Lifetime.OnTermination(() => called = true);
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ definition.TerminateAsync();
+ Assert.IsTrue(called);
+ }
+ }
+
+ [Test]
+ public void AsyncTerminationUnderExecutionErrorTest()
+ {
+ var definition = new LifetimeDefinition();
+ var called = false;
+ var thread = Thread.CurrentThread;
+ definition.Lifetime.OnTermination(() =>
+ {
+ Assert.AreEqual(thread, Thread.CurrentThread);
+ called = true;
+ });
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ Assert.Throws(() => definition.TerminateAsync());
+ Assert.IsFalse(called);
+ }
+ Assert.IsFalse(called);
+
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.IsTrue(called);
+ }
+
+ [Test]
+ public void CancellationTokenAsyncTerminationTest()
+ {
+ {
+ var definition = new LifetimeDefinition();
+ var token = definition.Lifetime.ToCancellationToken();
+ definition.TerminateAsync();
+ Assert.IsTrue(token.IsCancellationRequested);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ definition.TerminateAsync();
+ var token = definition.Lifetime.ToCancellationToken();
+ Assert.IsTrue(token.IsCancellationRequested);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var token = definition.Lifetime.ToCancellationToken();
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+ TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ var task = definition.TerminateAsync();
+ Assert.IsTrue(token.IsCancellationRequested);
+ e2.Set();
+ Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10)));
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+ TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ var task = definition.TerminateAsync();
+ var token = definition.Lifetime.ToCancellationToken();
+ Assert.IsTrue(token.IsCancellationRequested);
+ e2.Set();
+ Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10)));
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+ TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ var task = definition.TerminateAsync();
+ e2.Set();
+ Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10)));
+ var token = definition.Lifetime.ToCancellationToken();
+ Assert.IsTrue(token.IsCancellationRequested);
+ }
+ }
+
+ [Test]
+ public void ConcurrentCancellationTokenAndAsyncTerminationTest()
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ var definition = new LifetimeDefinition();
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+
+ var executionTask = TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ e1.WaitOne();
+
+ const int threadsCount = 10;
+ var threads = 0;
+
+ var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.StartAsync(TaskScheduler.Default, async () =>
+ {
+ Interlocked.Increment(ref threads);
+ SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount);
+
+ if (num % 2 == 0)
+ return definition.ToCancellationToken();
+
+ await definition.TerminateAsync();
+ return definition.ToCancellationToken();
+ })).ToArray();
+
+ e2.Set();
+
+ var whenAllTask = Task.WhenAll(tasks);
+ Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1)));
+
+ var tokens = whenAllTask.Result;
+ Assert.IsTrue(tokens.All(x => x.IsCancellationRequested));
+
+ executionTask.Wait(TimeSpan.FromSeconds(10));
+ }
+ }
+
+ [Test]
+ public void ConcurrentCancellationTokenAndTerminationTest()
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ var definition = new LifetimeDefinition();
+ var e1 = new ManualResetEvent(false);
+ var e2 = new ManualResetEvent(false);
+
+ var executionTask = TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ using (definition.Lifetime.UsingExecuteIfAlive())
+ {
+ e1.Set();
+ e2.WaitOne();
+ }
+ });
+
+ e1.WaitOne();
+
+ const int threadsCount = 10;
+ var threads = 0;
+
+ var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ Interlocked.Increment(ref threads);
+ SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount);
+
+ if (num % 2 == 0)
+ return definition.ToCancellationToken();
+
+ definition.Terminate();
+ return definition.ToCancellationToken();
+ })).ToArray();
+
+ e2.Set();
+
+ var whenAllTask = Task.WhenAll(tasks);
+ Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1)));
+
+ var tokens = whenAllTask.Result;
+ Assert.IsTrue(tokens.All(x => x.IsCancellationRequested));
+
+ executionTask.Wait(TimeSpan.FromSeconds(10));
+ }
+ }
+
+
+ [Test]
+ public void ConcurrentToCancellationTokenTest()
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ var definition = new LifetimeDefinition();
+
+ const int threadsCount = 10;
+ var threads = 0;
+
+ var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.Start(TaskScheduler.Default, () =>
+ {
+ Interlocked.Increment(ref threads);
+ SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount);
+
+ return definition.ToCancellationToken();
+ })).ToArray();
+
+ var whenAllTask = Task.WhenAll(tasks);
+ Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1)));
+
+ var tokens = whenAllTask.Result;
+ var singleToken = tokens.Distinct().Single();
+ Assert.IsFalse(singleToken.IsCancellationRequested);
+
+ if (i % 2 == 0)
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ else
+ definition.Terminate();
+
+ Assert.IsTrue(singleToken.IsCancellationRequested);
+ }
+ }
+
+ [Test]
+ public void TerminatedToCancellationToken()
+ {
+ var token = LifetimeDefinition.Terminated.ToCancellationToken();
+ Assert.AreEqual(token, new CancellationToken(true));
+ }
+
+ [Test]
+ public void RecursiveTerminateAsync()
+ {
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ definition.Lifetime.OnTermination(() => definition.Terminate());
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.AreEqual(1, count);
+ Assert.AreEqual(LifetimeStatus.Terminated, definition.Status);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ definition.Lifetime.OnTermination(() =>
+ {
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ });
+
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.AreEqual(1, count);
+ Assert.AreEqual(LifetimeStatus.Terminated, definition.Status);
+ }
+
+ {
+ var definition = new LifetimeDefinition();
+ var nested = definition.Lifetime.CreateNested();
+ var count = 0;
+ definition.Lifetime.OnTermination(() => count++);
+ nested.Lifetime.OnTermination(() =>
+ {
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ });
+
+ Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully);
+ Assert.AreEqual(1, count);
+ Assert.AreEqual(LifetimeStatus.Terminated, definition.Status);
+ Assert.AreEqual(LifetimeStatus.Terminated, nested.Status);
+ }
+ }
+
+ [Test]
+ public void NonAsyncDisposableTerminationTest()
+ {
+ var definition = new LifetimeDefinition();
+ var disposable = new TestNonAsyncDisposable();
+ definition.Lifetime.OnTermination(disposable);
+ Assert.IsFalse(disposable.Disposed);
+
+ definition.Terminate();
+ Assert.IsTrue(disposable.Disposed);
+ }
+
+ private class TestNonAsyncDisposable : IDisposable, IAsyncDisposable
+ {
+ public bool Disposing { get; private set; }
+ public bool Disposed { get; private set; }
+
+ public void Dispose()
+ {
+ Disposing = true;
+ Disposed = true;
+ }
+
+ public ValueTask DisposeAsync() => throw new NotImplementedException(); // must not be called
+ }
+
+
+ private class TestAsyncDisposable : IDisposable, IAsyncDisposable
+ {
+ public bool Disposing { get; private set; }
+ public bool Disposed { get; private set; }
+
+ public void Dispose() => throw new NotImplementedException(); // must not be called
+
+ public async ValueTask DisposeAsync()
+ {
+ Disposing = true;
+ await Task.Yield();
+ Disposed = true;
+ }
+ }
#endif