From d34c3ddef19bb4d5acd46cf1d5271dd8b54e113b Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Thu, 21 Dec 2017 09:23:30 +0100 Subject: [PATCH 1/8] Tests should be precise - in temrs of what to expect --- src/core/Akka.Tests/Actor/AskSpec.cs | 53 +++++++++++++++++----------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index 7fcb46eea47..3844ea4c315 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -10,10 +10,10 @@ using Akka.Actor; using System; using System.Threading; +using System.Threading.Tasks; namespace Akka.Tests.Actor { - public class AskSpec : AkkaSpec { public class SomeActor : UntypedActor @@ -66,52 +66,63 @@ protected override void OnReceive(object message) } [Fact] - public void Can_Ask_actor() + public async Task Can_Ask_actor() { var actor = Sys.ActorOf(); - actor.Ask("answer").Result.ShouldBe("answer"); + var res = await actor.Ask("answer"); + res.ShouldBe("answer"); } [Fact] - public void Can_Ask_actor_with_timeout() + public async Task Can_Ask_actor_with_timeout() { var actor = Sys.ActorOf(); - actor.Ask("answer",TimeSpan.FromSeconds(10)).Result.ShouldBe("answer"); + var res = await actor.Ask("answer", TimeSpan.FromSeconds(10)); + res.ShouldBe("answer"); } [Fact] - public void Can_get_timeout_when_asking_actor() + public async Task Can_get_timeout_when_asking_actor() { var actor = Sys.ActorOf(); - Assert.Throws(() => { actor.Ask("timeout", TimeSpan.FromSeconds(3)).Wait(); }); + await Assert.ThrowsAsync(async () => await actor.Ask("timeout", TimeSpan.FromSeconds(3))); } [Fact] - public void Can_cancel_when_asking_actor() - { + public async Task Can_cancel_when_asking_actor() + { var actor = Sys.ActorOf(); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); - Assert.Throws(() => { actor.Ask("timeout", Timeout.InfiniteTimeSpan, cts.Token).Wait(); }); - Assert.True(cts.IsCancellationRequested); + using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) + { + await Assert.ThrowsAsync(async () => await actor.Ask("timeout", Timeout.InfiniteTimeSpan, cts.Token)); + } } + [Fact] - public void Cancelled_ask_with_null_timeout_should_remove_temp_actor() + public async Task Cancelled_ask_with_null_timeout_should_remove_temp_actor() { var actor = Sys.ActorOf(); - var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); - Assert.Throws(() => { actor.Ask("cancel", cts.Token).Wait(); }); - Assert.True(cts.IsCancellationRequested); + + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) + { + await Assert.ThrowsAsync(async () => await actor.Ask("cancel", cts.Token)); + } + Are_Temp_Actors_Removed(actor); } + [Fact] - public void Cancelled_ask_with_timeout_should_remove_temp_actor() + public async Task Cancelled_ask_with_timeout_should_remove_temp_actor() { var actor = Sys.ActorOf(); - var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); - Assert.Throws(() => { actor.Ask("cancel", TimeSpan.FromSeconds(30), cts.Token).Wait(); }); - Assert.True(cts.IsCancellationRequested); + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) + { + await Assert.ThrowsAsync(async () => await actor.Ask("cancel", TimeSpan.FromSeconds(30), cts.Token)); + } + Are_Temp_Actors_Removed(actor); } + private void Are_Temp_Actors_Removed(IActorRef actor) { var actorCell = actor as ActorRefWithCell; @@ -126,7 +137,7 @@ private void Are_Temp_Actors_Removed(IActorRef actor) container.ForEachChild(x => childCounter++); Assert.True(childCounter == 0, "Temp actors not all removed."); }); - + } /// From 5d7e0bb3fc4a34ef6b738391d240d87648b63bc2 Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Tue, 9 Jan 2018 20:55:44 +0100 Subject: [PATCH 2/8] Ask interface refined #3220 --- .../AsyncWriteProxyEx.cs | 61 ++++++++++-------- src/core/Akka.Tests/Actor/AskSpec.cs | 9 +++ src/core/Akka/Actor/Futures.cs | 62 ++++++++++++------- src/core/Akka/Util/Internal/TaskExtensions.cs | 34 ---------- 4 files changed, 83 insertions(+), 83 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs index 4d5d1e5bf39..4ac2269a7e5 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/AsyncWriteProxyEx.cs @@ -388,13 +388,13 @@ public static Task AskEx(this ICanTell self, Func messa return self.AskEx(messageFactory, null, cancellationToken); } - public static Task AskEx(this ICanTell self, Func messageFactory, TimeSpan? timeout, CancellationToken cancellationToken) + public static async Task AskEx(this ICanTell self, Func messageFactory, TimeSpan? timeout, CancellationToken cancellationToken) { IActorRefProvider provider = ResolveProvider(self); if (provider == null) throw new ArgumentException("Unable to resolve the target Provider", nameof(self)); - return AskEx(self, messageFactory, provider, timeout, cancellationToken).CastTask(); + return (T)await AskEx(self, messageFactory, provider, timeout, cancellationToken); } internal static IActorRefProvider ResolveProvider(ICanTell self) { @@ -410,49 +410,60 @@ internal static IActorRefProvider ResolveProvider(ICanTell self) return null; } - private static Task AskEx(ICanTell self, Func messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken) + private static async Task AskEx(ICanTell self, Func messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken) { var result = new TaskCompletionSource(); CancellationTokenSource timeoutCancellation = null; timeout = timeout ?? provider.Settings.AskTimeout; - List ctrList = new List(2); + var ctrList = new List(2); - if (timeout != System.Threading.Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan)) + if (timeout != Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan)) { timeoutCancellation = new CancellationTokenSource(); - ctrList.Add(timeoutCancellation.Token.Register(() => result.TrySetCanceled())); + + ctrList.Add(timeoutCancellation.Token.Register(() => + { + result.TrySetException(new AskTimeoutException($"Timeout after {timeout} seconds")); + })); + timeoutCancellation.CancelAfter(timeout.Value); } if (cancellationToken.CanBeCanceled) + { ctrList.Add(cancellationToken.Register(() => result.TrySetCanceled())); + } //create a new tempcontainer path ActorPath path = provider.TempPath(); - //callback to unregister from tempcontainer - Action unregister = - () => - { - // cancelling timeout (if any) in order to prevent memory leaks - // (a reference to 'result' variable in CancellationToken's callback) - if (timeoutCancellation != null) - { - timeoutCancellation.Cancel(); - timeoutCancellation.Dispose(); - } - for (var i = 0; i < ctrList.Count; i++) - { - ctrList[i].Dispose(); - } - provider.UnregisterTempActor(path); - }; - var future = new FutureActorRef(result, unregister, path); + var future = new FutureActorRef(result, () => { }, path); //The future actor needs to be registered in the temp container provider.RegisterTempActor(future, path); + self.Tell(messageFactory(future), future); - return result.Task; + + try + { + return await result.Task; + } + finally + { + //callback to unregister from tempcontainer + + provider.UnregisterTempActor(path); + + for (var i = 0; i < ctrList.Count; i++) + { + ctrList[i].Dispose(); + } + + if (timeoutCancellation != null) + { + timeoutCancellation.Dispose(); + } + } } } } diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index 3844ea4c315..b4972aeb594 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -123,6 +123,15 @@ public async Task Cancelled_ask_with_timeout_should_remove_temp_actor() Are_Temp_Actors_Removed(actor); } + [Fact] + public async Task ShouldFailWhenAskExpectsWrongType() + { + var actor = Sys.ActorOf(); + + // expect int, but in fact string + await Assert.ThrowsAsync(async () => await actor.Ask("answer")); + } + private void Are_Temp_Actors_Removed(IActorRef actor) { var actorCell = actor as ActorRefWithCell; diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index ee222d6437f..2dc2c17607b 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -106,7 +106,7 @@ public static async Task Ask(this ICanTell self, object message, TimeSpan? if (provider == null) throw new ArgumentException("Unable to resolve the target Provider", nameof(self)); - return (T) await Ask(self, message, provider, timeout, cancellationToken).ConfigureAwait(false); + return (T) await Ask(self, message, provider, timeout, cancellationToken); } /// @@ -132,54 +132,68 @@ internal static IActorRefProvider ResolveProvider(ICanTell self) private static readonly bool isRunContinuationsAsynchronouslyAvailable = Enum.IsDefined(typeof(TaskCreationOptions), RunContinuationsAsynchronously); - private static Task Ask(ICanTell self, object message, IActorRefProvider provider, + private static async Task Ask(ICanTell self, object message, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken) { TaskCompletionSource result; if (isRunContinuationsAsynchronouslyAvailable) + { result = new TaskCompletionSource((TaskCreationOptions)RunContinuationsAsynchronously); + } else + { result = new TaskCompletionSource(); + } CancellationTokenSource timeoutCancellation = null; timeout = timeout ?? provider.Settings.AskTimeout; - List ctrList = new List(2); + var ctrList = new List(2); if (timeout != Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan)) { timeoutCancellation = new CancellationTokenSource(); - ctrList.Add(timeoutCancellation.Token.Register(() => result.TrySetCanceled())); + + ctrList.Add(timeoutCancellation.Token.Register(() => + { + result.TrySetException(new AskTimeoutException($"Timeout after {timeout} seconds")); + })); + timeoutCancellation.CancelAfter(timeout.Value); } if (cancellationToken.CanBeCanceled) + { ctrList.Add(cancellationToken.Register(() => result.TrySetCanceled())); - + } + //create a new tempcontainer path ActorPath path = provider.TempPath(); - //callback to unregister from tempcontainer - Action unregister = - () => - { - // cancelling timeout (if any) in order to prevent memory leaks - // (a reference to 'result' variable in CancellationToken's callback) - if (timeoutCancellation != null) - { - timeoutCancellation.Cancel(); - timeoutCancellation.Dispose(); - } - for (var i = 0; i < ctrList.Count; i++) - { - ctrList[i].Dispose(); - } - provider.UnregisterTempActor(path); - }; - var future = new FutureActorRef(result, unregister, path, isRunContinuationsAsynchronouslyAvailable); + var future = new FutureActorRef(result, () => { }, path, isRunContinuationsAsynchronouslyAvailable); //The future actor needs to be registered in the temp container provider.RegisterTempActor(future, path); self.Tell(message, future); - return result.Task; + + try + { + return await result.Task; + } + finally + { + //callback to unregister from tempcontainer + + provider.UnregisterTempActor(path); + + for (var i = 0; i < ctrList.Count; i++) + { + ctrList[i].Dispose(); + } + + if (timeoutCancellation != null) + { + timeoutCancellation.Dispose(); + } + } } } diff --git a/src/core/Akka/Util/Internal/TaskExtensions.cs b/src/core/Akka/Util/Internal/TaskExtensions.cs index 968f252894d..dabb714f4f4 100644 --- a/src/core/Akka/Util/Internal/TaskExtensions.cs +++ b/src/core/Akka/Util/Internal/TaskExtensions.cs @@ -20,40 +20,6 @@ namespace Akka.Util.Internal [InternalApi] public static class TaskExtensions { - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - public static Task CastTask(this Task task) - { - if (task.IsCompleted) - return Task.FromResult((TResult) (object)task.Result); - var tcs = new TaskCompletionSource(); - if (task.IsFaulted) - tcs.SetException(task.Exception); - else - task.ContinueWith(_ => - { - if (task.IsFaulted || task.Exception != null) - tcs.SetException(task.Exception); - else if (task.IsCanceled) - tcs.SetCanceled(); - else - try - { - tcs.SetResult((TResult) (object) task.Result); - } - catch (Exception e) - { - tcs.SetException(e); - } - }, TaskContinuationOptions.ExecuteSynchronously); - return tcs.Task; - } - /// /// Returns the task which completes with result of original task if cancellation token not canceled it before completion. /// From bf21760ca21df143367e4dcbdeb6f3d1c053e7fa Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Wed, 27 Dec 2017 16:37:23 +0100 Subject: [PATCH 3/8] ClusterRouter unit test fix #3220 --- .../Routing/ClusterRouterAsk1343BugFixSpec.cs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs index 308735aedad..471c233b301 100644 --- a/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs +++ b/src/core/Akka.Cluster.Tests/Routing/ClusterRouterAsk1343BugFixSpec.cs @@ -96,14 +96,7 @@ public async Task Should_Ask_Clustered_Group_Router_and_with_no_routees_and_time var router = Sys.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "router3"); Assert.IsType(router); - try - { - var result = await router.Ask("foo"); - } - catch (Exception ex) - { - Assert.IsType(ex); - } + await Assert.ThrowsAsync(async () => await router.Ask("foo")); } } } From 5bbd12e8ea59bf0c5a95a13e3db3a69bf78a3b13 Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Tue, 2 Jan 2018 19:47:41 +0100 Subject: [PATCH 4/8] Ask deadlock test added #3220 --- .../AsyncContext.SynchronizationContext.cs | 0 .../AsyncContext/AsyncContext.TaskQueue.cs | 0 .../AsyncContext/AsyncContext.TaskScheduler.cs | 0 .../AsyncContext/AsyncContext.cs | 0 .../AsyncContext/AsyncEx.LICENSE | 0 .../AsyncContext/BoundAction.cs | 0 .../AsyncContext/Disposables.LICENSE | 0 .../AsyncContext/ExceptionHelpers.cs | 0 .../AsyncContext/SingleDisposable (of T).cs | 0 .../SynchronizationContextSwitcher.cs | 0 .../AsyncContext/SynchronousTaskExtensions.cs.cs | 0 .../AsyncContext/about.txt | 0 src/core/Akka.Tests/Actor/AskSpec.cs | 16 ++++++++++++++-- 13 files changed, 14 insertions(+), 2 deletions(-) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/AsyncContext.SynchronizationContext.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/AsyncContext.TaskQueue.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/AsyncContext.TaskScheduler.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/AsyncContext.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/AsyncEx.LICENSE (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/BoundAction.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/Disposables.LICENSE (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/ExceptionHelpers.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/SingleDisposable (of T).cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/SynchronizationContextSwitcher.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/SynchronousTaskExtensions.cs.cs (100%) rename src/core/{Akka.Remote.Tests => Akka.Tests.Shared.Internals}/AsyncContext/about.txt (100%) diff --git a/src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.SynchronizationContext.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.SynchronizationContext.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.SynchronizationContext.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.SynchronizationContext.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.TaskQueue.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.TaskQueue.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.TaskQueue.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.TaskQueue.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.TaskScheduler.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.TaskScheduler.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.TaskScheduler.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.TaskScheduler.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/AsyncContext.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncContext.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/AsyncEx.LICENSE b/src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncEx.LICENSE similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/AsyncEx.LICENSE rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/AsyncEx.LICENSE diff --git a/src/core/Akka.Remote.Tests/AsyncContext/BoundAction.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/BoundAction.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/BoundAction.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/BoundAction.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/Disposables.LICENSE b/src/core/Akka.Tests.Shared.Internals/AsyncContext/Disposables.LICENSE similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/Disposables.LICENSE rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/Disposables.LICENSE diff --git a/src/core/Akka.Remote.Tests/AsyncContext/ExceptionHelpers.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/ExceptionHelpers.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/ExceptionHelpers.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/ExceptionHelpers.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/SingleDisposable (of T).cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/SingleDisposable (of T).cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/SingleDisposable (of T).cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/SingleDisposable (of T).cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/SynchronizationContextSwitcher.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/SynchronizationContextSwitcher.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/SynchronizationContextSwitcher.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/SynchronizationContextSwitcher.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/SynchronousTaskExtensions.cs.cs b/src/core/Akka.Tests.Shared.Internals/AsyncContext/SynchronousTaskExtensions.cs.cs similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/SynchronousTaskExtensions.cs.cs rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/SynchronousTaskExtensions.cs.cs diff --git a/src/core/Akka.Remote.Tests/AsyncContext/about.txt b/src/core/Akka.Tests.Shared.Internals/AsyncContext/about.txt similarity index 100% rename from src/core/Akka.Remote.Tests/AsyncContext/about.txt rename to src/core/Akka.Tests.Shared.Internals/AsyncContext/about.txt diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index b4972aeb594..81f7bd119cb 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -11,6 +11,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Nito.AsyncEx; namespace Akka.Tests.Actor { @@ -39,9 +40,9 @@ public WaitActor(IActorRef replyActor, IActorRef testActor) _testActor = testActor; } - private IActorRef _replyActor; + private readonly IActorRef _replyActor; - private IActorRef _testActor; + private readonly IActorRef _testActor; protected override void OnReceive(object message) { @@ -132,6 +133,17 @@ public async Task ShouldFailWhenAskExpectsWrongType() await Assert.ThrowsAsync(async () => await actor.Ask("answer")); } + [Fact] + public void AskDoesNotDeadlockWhenWaitForResultInGuiApplication() + { + AsyncContext.Run(() => + { + var actor = Sys.ActorOf(); + var res = actor.Ask("answer").Result; // blocking on purpose + res.ShouldBe("answer"); + }); + } + private void Are_Temp_Actors_Removed(IActorRef actor) { var actorCell = actor as ActorRefWithCell; From 0b57e06fa4bc277781f67da5cb80edcf68c383fd Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Tue, 2 Jan 2018 21:33:36 +0100 Subject: [PATCH 5/8] Handle deadlock by removing the SynchronizationContext #3220 --- src/core/Akka/Actor/Futures.cs | 2 + .../Internal/SynchronizationContextManager.cs | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 src/core/Akka/Util/Internal/SynchronizationContextManager.cs diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index 2dc2c17607b..63a87a4abea 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -102,6 +102,8 @@ public static Task Ask(this ICanTell self, object message, CancellationTok /// TBD public static async Task Ask(this ICanTell self, object message, TimeSpan? timeout, CancellationToken cancellationToken) { + await SynchronizationContextManager.RemoveContext; + IActorRefProvider provider = ResolveProvider(self); if (provider == null) throw new ArgumentException("Unable to resolve the target Provider", nameof(self)); diff --git a/src/core/Akka/Util/Internal/SynchronizationContextManager.cs b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs new file mode 100644 index 00000000000..1ff16ec2276 --- /dev/null +++ b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs @@ -0,0 +1,48 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Akka.Util.Internal +{ + public static class SynchronizationContextManager + { + public static ContextRemover RemoveContext { get; } = new ContextRemover(); + } + + public class ContextRemover : INotifyCompletion + { + public bool IsCompleted => SynchronizationContext.Current == null; + + public void OnCompleted(Action continuation) + { + var prevContext = SynchronizationContext.Current; + + try + { + SynchronizationContext.SetSynchronizationContext(null); + continuation(); + } + finally + { + SynchronizationContext.SetSynchronizationContext(prevContext); + } + } + + public ContextRemover GetAwaiter() + { + return this; + } + + public void GetResult() + { + + } + } +} From 054948a3acf9bbede654f20a9f77d4c810a7efd5 Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Wed, 3 Jan 2018 15:53:36 +0100 Subject: [PATCH 6/8] Fixing ScatterGather router test #3220 --- .../Routing/ScatterGatherFirstCompleted.cs | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/core/Akka/Routing/ScatterGatherFirstCompleted.cs b/src/core/Akka/Routing/ScatterGatherFirstCompleted.cs index 3a6becbf5a8..3140b86262d 100644 --- a/src/core/Akka/Routing/ScatterGatherFirstCompleted.cs +++ b/src/core/Akka/Routing/ScatterGatherFirstCompleted.cs @@ -44,7 +44,7 @@ public ScatterGatherFirstCompletedRoutingLogic(TimeSpan within) /// A that receives the . public override Routee Select(object message, Routee[] routees) { - return new ScatterGatherFirstCompletedRoutees(routees,_within); + return new ScatterGatherFirstCompletedRoutees(routees, _within); } } @@ -77,38 +77,31 @@ public ScatterGatherFirstCompletedRoutees(Routee[] routees, TimeSpan within) /// The actor sending the message. public override void Send(object message, IActorRef sender) { - var tcs = new TaskCompletionSource(); + SendMessage(message).PipeTo(sender); + } + private async Task SendMessage(object message) + { if (_routees.IsNullOrEmpty()) { - tcs.SetResult(new Status.Failure(new AskTimeoutException("Timeout due to no routees"))); + return new Status.Failure(new AskTimeoutException("Timeout due to no routees")); } - else + + try { + var tasks = _routees .Select(routee => routee.Ask(message, _within)) .ToList(); - Task - .WhenAny(tasks) - .ContinueWith(task => - { - if (task.Result.IsCanceled) - { - tcs.SetResult(new Status.Failure(new AskTimeoutException($"Timeout after {_within.TotalSeconds} seconds"))); - } - else if (task.Result.IsFaulted) - { - tcs.SetResult(new Status.Failure(task.Result.Exception)); - } - else - { - tcs.SetResult(task.Result.Result); - } - }); - } + var firstFinishedTask = await Task.WhenAny(tasks); - tcs.Task.PipeTo(sender); + return await firstFinishedTask; + } + catch (Exception e) + { + return new Status.Failure(e); + } } } From bae4ae43e0607c2ccf7d73def7817d138648fbba Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Mon, 8 Jan 2018 22:36:21 +0000 Subject: [PATCH 7/8] Ask interface refined #3220 AskSpecs consolidated Api change approval - internal CastTask removed --- .../CoreAPISpec.ApproveCore.approved.txt | 1 - src/core/Akka.Tests/Actor/AskSpec.cs | 30 +++++++ src/core/Akka.Tests/Actor/AskTimeoutSpec.cs | 82 ------------------- .../Internal/SynchronizationContextManager.cs | 28 ++++++- 4 files changed, 56 insertions(+), 85 deletions(-) delete mode 100644 src/core/Akka.Tests/Actor/AskTimeoutSpec.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 4cd41324af6..1e883f9521a 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -4891,7 +4891,6 @@ namespace Akka.Util.Internal [Akka.Annotations.InternalApiAttribute()] public class static TaskExtensions { - public static System.Threading.Tasks.Task CastTask(this System.Threading.Tasks.Task task) { } public static System.Threading.Tasks.Task WithCancellation(this System.Threading.Tasks.Task task, System.Threading.CancellationToken cancellationToken) { } } } diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index 81f7bd119cb..d33eb1de642 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -17,6 +17,10 @@ namespace Akka.Tests.Actor { public class AskSpec : AkkaSpec { + public AskSpec() + : base(@"akka.actor.ask-timeout = 3000ms") + { } + public class SomeActor : UntypedActor { protected override void OnReceive(object message) @@ -25,6 +29,7 @@ protected override void OnReceive(object message) { Thread.Sleep(5000); } + if (message.Equals("answer")) { Sender.Tell("answer"); @@ -99,6 +104,21 @@ public async Task Can_cancel_when_asking_actor() } } + [Fact] + public async Task Ask_should_honor_config_specified_timeout() + { + var actor = Sys.ActorOf(); + try + { + await actor.Ask("timeout"); + Assert.True(false, "the ask should have timed out with default timeout"); + } + catch (AskTimeoutException e) + { + Assert.Equal("Timeout after 00:00:03 seconds", e.Message); + } + } + [Fact] public async Task Cancelled_ask_with_null_timeout_should_remove_temp_actor() { @@ -124,6 +144,16 @@ public async Task Cancelled_ask_with_timeout_should_remove_temp_actor() Are_Temp_Actors_Removed(actor); } + [Fact] + public async Task AskTimeout_with_default_timeout_should_remove_temp_actor() + { + var actor = Sys.ActorOf(); + + await Assert.ThrowsAsync(async () => await actor.Ask("timeout")); + + Are_Temp_Actors_Removed(actor); + } + [Fact] public async Task ShouldFailWhenAskExpectsWrongType() { diff --git a/src/core/Akka.Tests/Actor/AskTimeoutSpec.cs b/src/core/Akka.Tests/Actor/AskTimeoutSpec.cs deleted file mode 100644 index 7cdac3d1b27..00000000000 --- a/src/core/Akka.Tests/Actor/AskTimeoutSpec.cs +++ /dev/null @@ -1,82 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2016 Lightbend Inc. -// Copyright (C) 2013-2016 Akka.NET project -// -//----------------------------------------------------------------------- - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -using Akka.Actor; -using Akka.TestKit; - -using Xunit; - -namespace Akka.Tests.Actor -{ - public class AskTimeoutSpec : AkkaSpec - { - - public class SleepyActor : UntypedActor - { - - protected override void OnReceive(object message) - { - Thread.Sleep(5000); - Sender.Tell(message); - } - - } - - public AskTimeoutSpec() - : base(@"akka.actor.ask-timeout = 100ms") - {} - - [Fact] - public async Task Ask_should_honor_config_specified_timeout() - { - var actor = Sys.ActorOf(); - try - { - await actor.Ask("should time out"); - Assert.True(false, "the ask should have timed out"); - } - catch (Exception e) - { - Assert.True(e is TaskCanceledException); - } - } - - [Fact] - public async Task TimedOut_ask_should_remove_temp_actor() - { - var actor = Sys.ActorOf(); - - var actorCell = actor as ActorRefWithCell; - Assert.NotNull(actorCell); - - var container = actorCell.Provider.TempContainer as VirtualPathContainer; - Assert.NotNull(container); - try - { - await actor.Ask("should time out"); - } - catch (Exception) - { - // Need to spin here, since the continuation function may not execute immediately - AwaitAssert(() => - { - var childCounter = 0; - container.ForEachChild(x => childCounter++); - Assert.True(childCounter == 0, "Number of children in temp container should be 0."); - }); - - } - - } - - } -} diff --git a/src/core/Akka/Util/Internal/SynchronizationContextManager.cs b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs index 1ff16ec2276..3c49cd63ca1 100644 --- a/src/core/Akka/Util/Internal/SynchronizationContextManager.cs +++ b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs @@ -5,18 +5,42 @@ // //----------------------------------------------------------------------- +using Akka.Annotations; using System; using System.Runtime.CompilerServices; using System.Threading; namespace Akka.Util.Internal { - public static class SynchronizationContextManager + /// + /// SynchronizationContextManager controls SynchronizationContext of the async pipeline. + /// Does the same thing as .ConfigureAwait(false) but better - it should be written once only, + /// unlike .ConfigureAwait(false). + /// await SynchronizationContextManager.RemoveContext; + /// Should be used as a very first line inside async public API of the library code + /// + /// + /// This sample shows how to use . + /// + /// class CoolLib + /// { + /// public async Task DoSomething() + /// { + /// await SynchronizationContextManager.RemoveContext; + /// + /// await DoSomethingElse(); + /// } + /// } + /// + /// + [InternalApi] + internal static class SynchronizationContextManager { public static ContextRemover RemoveContext { get; } = new ContextRemover(); } - public class ContextRemover : INotifyCompletion + [InternalApi] + internal class ContextRemover : INotifyCompletion { public bool IsCompleted => SynchronizationContext.Current == null; From 243131abe42ca2c04fb847d5140c5e10c1c87930 Mon Sep 17 00:00:00 2001 From: Maxim Cherednik Date: Sat, 13 Jan 2018 22:18:48 +0100 Subject: [PATCH 8/8] Fixing header #3220 --- .../Akka/Util/Internal/SynchronizationContextManager.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/Akka/Util/Internal/SynchronizationContextManager.cs b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs index 3c49cd63ca1..c58eeb9fcd4 100644 --- a/src/core/Akka/Util/Internal/SynchronizationContextManager.cs +++ b/src/core/Akka/Util/Internal/SynchronizationContextManager.cs @@ -1,5 +1,5 @@ -//----------------------------------------------------------------------- -// +//----------------------------------------------------------------------- +// // Copyright (C) 2009-2016 Lightbend Inc. // Copyright (C) 2013-2016 Akka.NET project // @@ -66,7 +66,7 @@ public ContextRemover GetAwaiter() public void GetResult() { - + // empty on purpose } } }