diff --git a/src/benchmark/PingPong/ClientAsyncActor.cs b/src/benchmark/PingPong/ClientAsyncActor.cs new file mode 100644 index 00000000000..8f1d2be0290 --- /dev/null +++ b/src/benchmark/PingPong/ClientAsyncActor.cs @@ -0,0 +1,48 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; + +#pragma warning disable 1998 //async method lacks an await + +namespace PingPong +{ + public class ClientAsyncActor : ReceiveActor + { + public ClientAsyncActor(IActorRef actor, long repeat, TaskCompletionSource latch) + { + var received = 0L; + var sent = 0L; + Receive(async m => + { + received++; + if (sent < repeat) + { + actor.Tell(m); + sent++; + } + else if (received >= repeat) + { + latch.SetResult(true); + } + }); + Receive(r => + { + var msg = new Messages.Msg(); + for (int i = 0; i < Math.Min(1000, repeat); i++) + { + actor.Tell(msg); + sent++; + } + }); + Receive(s => Sender.Tell(s)); + } + } +} + diff --git a/src/benchmark/PingPong/PingPong.csproj b/src/benchmark/PingPong/PingPong.csproj index f8419541041..0dd29091916 100644 --- a/src/benchmark/PingPong/PingPong.csproj +++ b/src/benchmark/PingPong/PingPong.csproj @@ -64,6 +64,7 @@ + diff --git a/src/benchmark/PingPong/Program.cs b/src/benchmark/PingPong/Program.cs index 03ad57c0610..01c0d4b37f8 100644 --- a/src/benchmark/PingPong/Program.cs +++ b/src/benchmark/PingPong/Program.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Globalization; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -39,12 +40,19 @@ public static uint CpuSpeed() private static void Main(params string[] args) { - uint timesToRun = args.Length == 1 ? uint.Parse(args[0]) : 1u; - Start(timesToRun); + uint timesToRun; + if (args.Length == 0 || !uint.TryParse(args[0], out timesToRun)) + { + timesToRun = 1u; + } + + bool testAsync = args.Contains("--async"); + + Start(timesToRun, testAsync); Console.ReadKey(); } - private static async void Start(uint timesToRun) + private static async void Start(uint timesToRun, bool testAsync) { const int repeatFactor = 500; const long repeat = 30000L * repeatFactor; @@ -77,16 +85,38 @@ private static async void Start(uint timesToRun) Console.Write("ReceiveActor first start time: "); await Benchmark(1, 1, 1, PrintStats.StartTimeOnly, -1, -1); Console.WriteLine(" ms"); + + if (testAsync) + { + Console.Write("AsyncActor first start time: "); + await Benchmark(1, 1, 1, PrintStats.StartTimeOnly, -1, -1); + Console.WriteLine(" ms"); + } + + Console.WriteLine(); + + Console.Write(" ActorBase ReceiveActor"); + if (testAsync) + { + Console.Write(" AsyncActor"); + } + Console.WriteLine(); + + Console.Write("Throughput, Msgs/sec, Start [ms], Total [ms], Msgs/sec, Start [ms], Total [ms]"); + if (testAsync) + { + Console.Write(", Msgs/sec, Start [ms], Total [ms]"); + } Console.WriteLine(); - Console.WriteLine(" ActorBase ReceiveActor"); - Console.WriteLine("Throughput, Msgs/sec, Start [ms], Total [ms], Msgs/sec, Start [ms], Total [ms]"); for(var i = 0; i < timesToRun; i++) { var redCountActorBase=0; var redCountReceiveActor=0; + var redCountAsyncActor = 0; var bestThroughputActorBase=0L; var bestThroughputReceiveActor=0L; + var bestThroughputAsyncActor = 0L; foreach(var throughput in GetThroughputSettings()) { var result1 = await Benchmark(throughput, processorCount, repeat, PrintStats.LineStart | PrintStats.Stats, bestThroughputActorBase, redCountActorBase); @@ -96,6 +126,15 @@ private static async void Start(uint timesToRun) var result2 = await Benchmark(throughput, processorCount, repeat, PrintStats.Stats, bestThroughputReceiveActor, redCountReceiveActor); bestThroughputReceiveActor = result2.Item2; redCountReceiveActor = result2.Item3; + + if (testAsync) + { + Console.Write(", "); + var result3 = await Benchmark(throughput, processorCount, repeat, PrintStats.Stats, bestThroughputAsyncActor, redCountAsyncActor); + bestThroughputAsyncActor = result3.Item2; + redCountAsyncActor = result3.Item3; + } + Console.WriteLine(); } } diff --git a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs index 53d49d538ed..7706e9e03cb 100644 --- a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs +++ b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit; @@ -25,7 +26,7 @@ public ReceiveTimeoutAsyncActor() Receive(async s => { _replyTo = Sender; - + await Task.Delay(TimeSpan.FromMilliseconds(100)); SetReceiveTimeout(TimeSpan.FromMilliseconds(100)); }); @@ -35,7 +36,7 @@ class AsyncActor : ReceiveActor { public AsyncActor() { - Receive( async s => + Receive(async s => { await Task.Yield(); await Task.Delay(TimeSpan.FromMilliseconds(100)); @@ -210,9 +211,9 @@ public AsyncTplActor() Receive(m => { //this is also safe, all tasks complete in the actor context - RunTask(() => + RunTask(async () => { - Task.Delay(TimeSpan.FromSeconds(1)) + await Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { Sender.Tell("done"); }); }); }); @@ -228,11 +229,11 @@ public AsyncTplExceptionActor(IActorRef callback) _callback = callback; Receive(m => { - RunTask(() => + RunTask(async () => { - Task.Delay(TimeSpan.FromSeconds(1)) + await Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { throw new Exception("foo"); }); - }); + }); }); } @@ -243,6 +244,16 @@ protected override void PostRestart(Exception reason) } } + public class RestartMessage + { + public object Message { get; private set; } + + public RestartMessage(object message) + { + Message = message; + } + } + public class ActorAsyncAwaitSpec : AkkaSpec { [Fact] @@ -280,8 +291,8 @@ public async Task Actors_should_be_able_to_async_await_ask_message_loop() [Fact] public async Task Actors_should_be_able_to_block_ask_message_loop() { - var actor = Sys.ActorOf(Props.Create().WithDispatcher("akka.actor.task-dispatcher"),"Worker"); - var asker =Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"),"Asker"); + var actor = Sys.ActorOf(Props.Create().WithDispatcher("akka.actor.task-dispatcher"), "Worker"); + var asker = Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"), "Asker"); var task = asker.Ask("start", TimeSpan.FromSeconds(5)); actor.Tell(123, ActorRefs.NoSender); var res = await task; @@ -291,7 +302,7 @@ public async Task Actors_should_be_able_to_block_ask_message_loop() [Fact(Skip = "Maybe not possible to solve")] public async Task Actors_should_be_able_to_block_ask_self_message_loop() { - var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()),"Asker"); + var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()), "Asker"); var task = asker.Ask("start", TimeSpan.FromSeconds(5)); var res = await task; Assert.Equal("done", res); @@ -344,7 +355,6 @@ public async Task Actor_should_be_able_to_resume_suspend() res.ShouldBe("done"); } - [Fact] public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation() { @@ -353,6 +363,131 @@ public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation() actor.Tell("hello"); ExpectMsg(m => m == "GotIt"); } + + public class AsyncExceptionCatcherActor : ReceiveActor + { + private string _lastMessage; + + public AsyncExceptionCatcherActor() + { + Receive(async m => + { + _lastMessage = m; + try + { + // Throw an exception in the ActorTaskScheduler + await Task.Factory.StartNew(() => + { + throw new Exception("should not restart"); + }); + } + catch (Exception) + { + } + }); + + Receive(_ => Sender.Tell(_lastMessage, Self)); + } + } + + [Fact] + public async Task Actor_should_not_restart_if_exception_is_catched() + { + var actor = Sys.ActorOf(); + + actor.Tell("hello"); + + var lastMessage = await actor.Ask(123); + + lastMessage.ShouldBe("hello"); + } + + public class AsyncFailingActor : ReceiveActor + { + public AsyncFailingActor() + { + Receive(async m => + { + ThrowException(); + }); + } + + protected override void PreRestart(Exception reason, object message) + { + Sender.Tell(new RestartMessage(message), Self); + + base.PreRestart(reason, message); + } + + private static void ThrowException() + { + throw new Exception("foo"); + } + } + + [Fact] + public void Actor_PreRestart_should_give_the_failing_message() + { + var actor = Sys.ActorOf(); + + actor.Tell("hello"); + + ExpectMsg(m => "hello".Equals(m.Message)); + } + + public class AsyncPipeToDelayActor : ReceiveActor + { + public AsyncPipeToDelayActor() + { + Receive(async msg => + { + Task.Run(() => + { + Thread.Sleep(10); + return msg; + }).PipeTo(Sender, Self); //LogicalContext is lost?!? + + Thread.Sleep(3000); + }); + } + } + + public class AsyncReentrantActor : ReceiveActor + { + public AsyncReentrantActor() + { + Receive(async msg => + { + var sender = Sender; + Task.Run(() => + { + //Sleep to make sure the task is not completed when ContinueWith is called + Thread.Sleep(100); + return msg; + }).ContinueWith(_ => sender.Tell(msg)); // ContinueWith will schedule with the implicit ActorTaskScheduler + + Thread.Sleep(3000); + }); + } + } + + [Fact] + public void ActorTaskScheduler_reentrancy_should_not_be_possible() + { + var actor = Sys.ActorOf(); + actor.Tell("hello"); + + ExpectNoMsg(1000); + } + + [Fact] + public void Actor_PipeTo_should_not_be_delayed_by_async_receive() + { + var actor = Sys.ActorOf(); + + actor.Tell("hello"); + ExpectMsg(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000)); + } } } diff --git a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs index dc56e895449..50c1a4f12dd 100644 --- a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs +++ b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs @@ -165,7 +165,7 @@ public void SystemInvoke(Envelope envelope) { var m = envelope.Message; - if (m is CompleteTask) HandleCompleteTask(m as CompleteTask); + if (m is ActorTaskSchedulerMessage) HandleActorTaskSchedulerMessage(m as ActorTaskSchedulerMessage); else if (m is Failed) HandleFailed(m as Failed); else if (m is DeathWatchNotification) { @@ -203,12 +203,17 @@ public void SystemInvoke(Envelope envelope) } } - private void HandleCompleteTask(CompleteTask task) + private void HandleActorTaskSchedulerMessage(ActorTaskSchedulerMessage m) { - CurrentMessage = task.State.Message; - Sender = task.State.Sender; - task.SetResult(); + if (m.Exception != null) + { + HandleInvokeFailure(m.Exception); + return; + } + + m.ExecuteTask(); } + public void SwapMailbox(DeadLetterMailbox mailbox) { Mailbox.DebugPrint("{0} Swapping mailbox to DeadLetterMailbox", Self); diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 40dfdfdc9f3..acbd4cbdc40 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -30,6 +30,7 @@ public partial class ActorCell : IUntypedActorContext, ICell private bool _actorHasBeenCleared; private Mailbox _mailbox; private readonly ActorSystemImpl _systemImpl; + private ActorTaskScheduler _taskScheduler; public ActorCell(ActorSystemImpl system, IInternalActorRef self, Props props, MessageDispatcher dispatcher, IInternalActorRef parent) @@ -66,6 +67,20 @@ internal static ActorCell Current internal bool ActorHasBeenCleared { get { return _actorHasBeenCleared; } } internal static Props TerminatedProps { get { return terminatedProps; } } + public ActorTaskScheduler TaskScheduler + { + get + { + var taskScheduler = Volatile.Read(ref _taskScheduler); + + if (taskScheduler != null) + return taskScheduler; + + taskScheduler = new ActorTaskScheduler(this); + return Interlocked.CompareExchange(ref _taskScheduler, taskScheduler, null) ?? taskScheduler; + } + } + public void Init(bool sendSupervise, Func createMailbox /*, MailboxType mailboxType*/) //TODO: switch from Func createMailbox to MailboxType mailboxType { var mailbox = createMailbox(); //Akka: dispatcher.createMailbox(this, mailboxType) diff --git a/src/core/Akka/Actor/PipeToSupport.cs b/src/core/Akka/Actor/PipeToSupport.cs index ea1eb5cd79f..19ac16ed247 100644 --- a/src/core/Akka/Actor/PipeToSupport.cs +++ b/src/core/Akka/Actor/PipeToSupport.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; namespace Akka.Actor @@ -33,7 +34,7 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActor recipient.Tell(success != null ? success(tresult.Result) : tresult.Result, sender); - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } /// @@ -47,7 +48,7 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef se { if (tresult.IsCanceled || tresult.IsFaulted) recipient.Tell(new Status.Failure(tresult.Exception), sender); - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } } diff --git a/src/core/Akka/Dispatch/ActorTaskScheduler.cs b/src/core/Akka/Dispatch/ActorTaskScheduler.cs index 550d664c496..e53c898c85e 100644 --- a/src/core/Akka/Dispatch/ActorTaskScheduler.cs +++ b/src/core/Akka/Dispatch/ActorTaskScheduler.cs @@ -7,7 +7,6 @@ using System; using System.Collections.Generic; -using System.Runtime.Remoting.Messaging; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -15,29 +14,18 @@ namespace Akka.Dispatch { - public class AmbientState - { - public IActorRef Self { get; set; } - public IActorRef Sender { get; set; } - public object Message { get; set; } - } - public class ActorTaskScheduler : TaskScheduler { - public static readonly TaskScheduler Instance = new ActorTaskScheduler(); - public static readonly TaskFactory TaskFactory = new TaskFactory(Instance); - public static readonly string StateKey = "akka.state"; - private const string Faulted = "faulted"; - private static readonly object Outer = new object(); + private readonly ActorCell _actorCell; - public static void SetCurrentState(IActorRef self, IActorRef sender, object message) + internal ActorTaskScheduler(ActorCell actorCell) { - CallContext.LogicalSetData(StateKey, new AmbientState - { - Sender = sender, - Self = self, - Message = message - }); + _actorCell = actorCell; + } + + public override int MaximumConcurrencyLevel + { + get { return 1; } } protected override IEnumerable GetScheduledTasks() @@ -47,46 +35,41 @@ protected override IEnumerable GetScheduledTasks() protected override void QueueTask(Task task) { - var s = CallContext.LogicalGetData(StateKey) as AmbientState; - if (task.AsyncState == Outer || s == null) + if ((task.CreationOptions & TaskCreationOptions.LongRunning) == TaskCreationOptions.LongRunning) { - TryExecuteTask(task); + // Executing a LongRunning task in an ActorTaskScheduler is bad practice, it will potentially + // hang the actor and starve the ThreadPool + + // The best thing we can do here is force a rescheduling to at least not execute the task inline. + ScheduleTask(task); return; } - //we get here if the task needs to be marshalled back to the mailbox - //e.g. if previous task was an IO completion - s = CallContext.LogicalGetData(StateKey) as AmbientState; - - s.Self.Tell(new CompleteTask(s, () => + // Schedule the task execution, run inline if we are already in the actor context. + if (ActorCell.Current == _actorCell) { - SetCurrentState(s.Self,s.Sender,s.Message); TryExecuteTask(task); - if (task.IsFaulted) - Rethrow(task, null); + } + else + { + ScheduleTask(task); + } + } + + private void ScheduleTask(Task task) + { + _actorCell.Self.Tell(new ActorTaskSchedulerMessage(this, task), ActorRefs.NoSender); + } - }), ActorRefs.NoSender); + internal void ExecuteTask(Task task) + { + TryExecuteTask(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { - if (taskWasPreviouslyQueued) - return false; - - var s = CallContext.LogicalGetData(StateKey) as AmbientState; - var cell = ActorCell.Current; - - //Is the current cell and the current state the same? - if (cell != null && - s != null && - Equals(cell.Self, s.Self) && - Equals(cell.Sender, s.Sender) && - cell.CurrentMessage == s.Message) - { - var res = TryExecuteTask(task); - return res; - } - + // Prevent inline execution, it will execute inline anyway in QueueTask if we + // are already in the actor context. return false; } @@ -99,47 +82,62 @@ public static void RunTask(Action action) }); } - public static void RunTask(Func action) + public static void RunTask(Func asyncAction) { var context = ActorCell.Current; + + if (context == null) + throw new InvalidOperationException("RunTask must be call from an actor context."); + var mailbox = context.Mailbox; //suspend the mailbox mailbox.Suspend(MailboxSuspendStatus.AwaitingTask); - SetCurrentState(context.Self, context.Sender, null); - - //wrap our action inside a task, so that everything executing - //directly or indirectly from the action is executed on our task scheduler + ActorTaskScheduler actorScheduler = context.TaskScheduler; + + Task.Factory.StartNew(() => asyncAction(), CancellationToken.None, TaskCreationOptions.None, actorScheduler) + .Unwrap() + .ContinueWith(parent => + { + Exception exception = GetTaskException(parent); + + if (exception == null) + { + mailbox.Resume(MailboxSuspendStatus.AwaitingTask); + context.CheckReceiveTimeout(); + } + else + { + context.Self.Tell(new ActorTaskSchedulerMessage(exception), ActorRefs.NoSender); + } + + }, actorScheduler); + } - Task.Factory.StartNew(async _ => + private static Exception GetTaskException(Task task) + { + switch (task.Status) { + case TaskStatus.Canceled: + return new TaskCanceledException(); + + case TaskStatus.Faulted: + return TryUnwrapAggregateException(task.Exception); + } - //start executing our action and potential promise style - //tasks - await action() - //we need to use ContinueWith so that any exception is - //thrown inside the actor context. - //this is needed for IO completion tasks that execute out of context - .ContinueWith( - Rethrow, - Faulted, - TaskContinuationOptions.None); - - //if mailbox was suspended, make sure we re-enable message processing again - mailbox.Resume(MailboxSuspendStatus.AwaitingTask); - context.CheckReceiveTimeout(); - }, - Outer, - CancellationToken.None, - TaskCreationOptions.None, - Instance); + return null; } - private static void Rethrow(Task x, object s) + private static Exception TryUnwrapAggregateException(AggregateException aggregateException) { - //this just rethrows the exception the task contains - x.Wait(); + if (aggregateException == null) + return null; + + if (aggregateException.InnerExceptions.Count == 1) + return aggregateException.InnerExceptions[0]; + + return aggregateException; } } } diff --git a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs index b0b112b8395..0d6c66b80a5 100644 --- a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs +++ b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs @@ -243,30 +243,39 @@ public ActorTask(Task task) public Task Task { get; private set; } } - public sealed class CompleteTask : ISystemMessage + internal sealed class ActorTaskSchedulerMessage : ISystemMessage { + private readonly ActorTaskScheduler _scheduler; + private readonly Task _task; + /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - /// - /// The action. - public CompleteTask(AmbientState state, Action action) + public ActorTaskSchedulerMessage(ActorTaskScheduler scheduler, Task task) { - State = state; - SetResult = action; + _scheduler = scheduler; + _task = task; } - public AmbientState State { get; private set; } - /// - /// Gets the set result. + /// Initializes a new instance of the class. /// - /// The set result. - public Action SetResult { get; private set; } + /// The exception. + public ActorTaskSchedulerMessage(Exception exception) + { + Exception = exception; + } + + public Exception Exception { get; private set; } + + public void ExecuteTask() + { + _scheduler.ExecuteTask(_task); + } public override string ToString() { - return "CompleteTask - AmbientState: " + State; + return ""; } }