Skip to content

Commit

Permalink
Merge pull request #1484 from JeffCyr/ActorTaskScheduler-refactoring
Browse files Browse the repository at this point in the history
ActorTaskScheduler refactoring with benchmark
  • Loading branch information
rogeralsing committed Dec 4, 2015
2 parents eb2067d + be0db4a commit e724d60
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 113 deletions.
48 changes: 48 additions & 0 deletions src/benchmark/PingPong/ClientAsyncActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//-----------------------------------------------------------------------
// <copyright file="ClientReceiveActor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2015 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;

#pragma warning disable 1998 //async method lacks an await

namespace PingPong
{
public class ClientAsyncActor : ReceiveActor
{
public ClientAsyncActor(IActorRef actor, long repeat, TaskCompletionSource<bool> latch)
{
var received = 0L;
var sent = 0L;
Receive<Messages.Msg>(async m =>
{
received++;
if (sent < repeat)
{
actor.Tell(m);
sent++;
}
else if (received >= repeat)
{
latch.SetResult(true);
}
});
Receive<Messages.Run>(r =>
{
var msg = new Messages.Msg();
for (int i = 0; i < Math.Min(1000, repeat); i++)
{
actor.Tell(msg);
sent++;
}
});
Receive<Messages.Started>(s => Sender.Tell(s));
}
}
}

1 change: 1 addition & 0 deletions src/benchmark/PingPong/PingPong.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<Reference Include="System.Management" />
</ItemGroup>
<ItemGroup>
<Compile Include="ClientAsyncActor.cs" />
<Compile Include="ClientReceiveActor.cs" />
<Compile Include="ClientActorBase.cs" />
<Compile Include="Program.cs" />
Expand Down
49 changes: 44 additions & 5 deletions src/benchmark/PingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -39,12 +40,19 @@ public static uint CpuSpeed()

private static void Main(params string[] args)
{
uint timesToRun = args.Length == 1 ? uint.Parse(args[0]) : 1u;
Start(timesToRun);
uint timesToRun;
if (args.Length == 0 || !uint.TryParse(args[0], out timesToRun))
{
timesToRun = 1u;
}

bool testAsync = args.Contains("--async");

Start(timesToRun, testAsync);
Console.ReadKey();
}

private static async void Start(uint timesToRun)
private static async void Start(uint timesToRun, bool testAsync)
{
const int repeatFactor = 500;
const long repeat = 30000L * repeatFactor;
Expand Down Expand Up @@ -77,16 +85,38 @@ private static async void Start(uint timesToRun)
Console.Write("ReceiveActor first start time: ");
await Benchmark<ClientReceiveActor>(1, 1, 1, PrintStats.StartTimeOnly, -1, -1);
Console.WriteLine(" ms");

if (testAsync)
{
Console.Write("AsyncActor first start time: ");
await Benchmark<ClientAsyncActor>(1, 1, 1, PrintStats.StartTimeOnly, -1, -1);
Console.WriteLine(" ms");
}

Console.WriteLine();

Console.Write(" ActorBase ReceiveActor");
if (testAsync)
{
Console.Write(" AsyncActor");
}
Console.WriteLine();

Console.Write("Throughput, Msgs/sec, Start [ms], Total [ms], Msgs/sec, Start [ms], Total [ms]");
if (testAsync)
{
Console.Write(", Msgs/sec, Start [ms], Total [ms]");
}
Console.WriteLine();

Console.WriteLine(" ActorBase ReceiveActor");
Console.WriteLine("Throughput, Msgs/sec, Start [ms], Total [ms], Msgs/sec, Start [ms], Total [ms]");
for(var i = 0; i < timesToRun; i++)
{
var redCountActorBase=0;
var redCountReceiveActor=0;
var redCountAsyncActor = 0;
var bestThroughputActorBase=0L;
var bestThroughputReceiveActor=0L;
var bestThroughputAsyncActor = 0L;
foreach(var throughput in GetThroughputSettings())
{
var result1 = await Benchmark<ClientActorBase>(throughput, processorCount, repeat, PrintStats.LineStart | PrintStats.Stats, bestThroughputActorBase, redCountActorBase);
Expand All @@ -96,6 +126,15 @@ private static async void Start(uint timesToRun)
var result2 = await Benchmark<ClientReceiveActor>(throughput, processorCount, repeat, PrintStats.Stats, bestThroughputReceiveActor, redCountReceiveActor);
bestThroughputReceiveActor = result2.Item2;
redCountReceiveActor = result2.Item3;

if (testAsync)
{
Console.Write(", ");
var result3 = await Benchmark<ClientAsyncActor>(throughput, processorCount, repeat, PrintStats.Stats, bestThroughputAsyncActor, redCountAsyncActor);
bestThroughputAsyncActor = result3.Item2;
redCountAsyncActor = result3.Item3;
}

Console.WriteLine();
}
}
Expand Down
157 changes: 146 additions & 11 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
Expand All @@ -25,7 +26,7 @@ public ReceiveTimeoutAsyncActor()
Receive<string>(async s =>
{
_replyTo = Sender;

await Task.Delay(TimeSpan.FromMilliseconds(100));
SetReceiveTimeout(TimeSpan.FromMilliseconds(100));
});
Expand All @@ -35,7 +36,7 @@ class AsyncActor : ReceiveActor
{
public AsyncActor()
{
Receive<string>( async s =>
Receive<string>(async s =>
{
await Task.Yield();
await Task.Delay(TimeSpan.FromMilliseconds(100));
Expand Down Expand Up @@ -210,9 +211,9 @@ public AsyncTplActor()
Receive<string>(m =>
{
//this is also safe, all tasks complete in the actor context
RunTask(() =>
RunTask(async () =>
{
Task.Delay(TimeSpan.FromSeconds(1))
await Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { Sender.Tell("done"); });
});
});
Expand All @@ -228,11 +229,11 @@ public AsyncTplExceptionActor(IActorRef callback)
_callback = callback;
Receive<string>(m =>
{
RunTask(() =>
RunTask(async () =>
{
Task.Delay(TimeSpan.FromSeconds(1))
await Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { throw new Exception("foo"); });
});
});
});
}

Expand All @@ -243,6 +244,16 @@ protected override void PostRestart(Exception reason)
}
}

public class RestartMessage
{
public object Message { get; private set; }

public RestartMessage(object message)
{
Message = message;
}
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
[Fact]
Expand Down Expand Up @@ -280,8 +291,8 @@ public async Task Actors_should_be_able_to_async_await_ask_message_loop()
[Fact]
public async Task Actors_should_be_able_to_block_ask_message_loop()
{
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>().WithDispatcher("akka.actor.task-dispatcher"),"Worker");
var asker =Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"),"Asker");
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>().WithDispatcher("akka.actor.task-dispatcher"), "Worker");
var asker = Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"), "Asker");
var task = asker.Ask("start", TimeSpan.FromSeconds(5));
actor.Tell(123, ActorRefs.NoSender);
var res = await task;
Expand All @@ -291,7 +302,7 @@ public async Task Actors_should_be_able_to_block_ask_message_loop()
[Fact(Skip = "Maybe not possible to solve")]
public async Task Actors_should_be_able_to_block_ask_self_message_loop()
{
var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()),"Asker");
var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()), "Asker");
var task = asker.Ask("start", TimeSpan.FromSeconds(5));
var res = await task;
Assert.Equal("done", res);
Expand Down Expand Up @@ -344,7 +355,6 @@ public async Task Actor_should_be_able_to_resume_suspend()
res.ShouldBe("done");
}


[Fact]
public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation()
{
Expand All @@ -353,6 +363,131 @@ public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation()
actor.Tell("hello");
ExpectMsg<string>(m => m == "GotIt");
}

public class AsyncExceptionCatcherActor : ReceiveActor
{
private string _lastMessage;

public AsyncExceptionCatcherActor()
{
Receive<string>(async m =>
{
_lastMessage = m;
try
{
// Throw an exception in the ActorTaskScheduler
await Task.Factory.StartNew(() =>
{
throw new Exception("should not restart");
});
}
catch (Exception)
{
}
});

Receive<int>(_ => Sender.Tell(_lastMessage, Self));
}
}

[Fact]
public async Task Actor_should_not_restart_if_exception_is_catched()
{
var actor = Sys.ActorOf<AsyncExceptionCatcherActor>();

actor.Tell("hello");

var lastMessage = await actor.Ask(123);

lastMessage.ShouldBe("hello");
}

public class AsyncFailingActor : ReceiveActor
{
public AsyncFailingActor()
{
Receive<string>(async m =>
{
ThrowException();
});
}

protected override void PreRestart(Exception reason, object message)
{
Sender.Tell(new RestartMessage(message), Self);

base.PreRestart(reason, message);
}

private static void ThrowException()
{
throw new Exception("foo");
}
}

[Fact]
public void Actor_PreRestart_should_give_the_failing_message()
{
var actor = Sys.ActorOf<AsyncFailingActor>();

actor.Tell("hello");

ExpectMsg<RestartMessage>(m => "hello".Equals(m.Message));
}

public class AsyncPipeToDelayActor : ReceiveActor
{
public AsyncPipeToDelayActor()
{
Receive<string>(async msg =>
{
Task.Run(() =>
{
Thread.Sleep(10);
return msg;
}).PipeTo(Sender, Self); //LogicalContext is lost?!?

Thread.Sleep(3000);
});
}
}

public class AsyncReentrantActor : ReceiveActor
{
public AsyncReentrantActor()
{
Receive<string>(async msg =>
{
var sender = Sender;
Task.Run(() =>
{
//Sleep to make sure the task is not completed when ContinueWith is called
Thread.Sleep(100);
return msg;
}).ContinueWith(_ => sender.Tell(msg)); // ContinueWith will schedule with the implicit ActorTaskScheduler

Thread.Sleep(3000);
});
}
}

[Fact]
public void ActorTaskScheduler_reentrancy_should_not_be_possible()
{
var actor = Sys.ActorOf<AsyncReentrantActor>();
actor.Tell("hello");

ExpectNoMsg(1000);
}

[Fact]
public void Actor_PipeTo_should_not_be_delayed_by_async_receive()
{
var actor = Sys.ActorOf<AsyncPipeToDelayActor>();

actor.Tell("hello");
ExpectMsg<string>(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000));
}
}
}

Loading

0 comments on commit e724d60

Please sign in to comment.