Skip to content

Commit

Permalink
ActorTaskScheduler QueueTask for LongRunning akkadotnet#1410
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Nov 26, 2015
1 parent 873f52b commit 6032af4
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
91 changes: 91 additions & 0 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,99 @@ protected override void PostRestart(Exception reason)
}
}

public class AsyncLongRunningActor : ReceiveActor
{
Task longRunning;
System.Threading.CancellationTokenSource cts;

public AsyncLongRunningActor()
{
Receive<string>(m =>
{
if(m.Equals("start"))
{
RunTask(() =>
{
cts = new System.Threading.CancellationTokenSource();
//long running task as side effect
longRunning = Task.Factory.StartNew(() =>
{
cts.Token.WaitHandle.WaitOne();
return "done";
}, TaskCreationOptions.LongRunning)
.PipeTo(Sender);
});
}
else if (m.Equals("stop"))
{
cts.Cancel();
}
});
}
}

public class AsyncContextSupportForTaskActor : ReceiveActor
{
public AsyncContextSupportForTaskActor()
{
Receive<string>(m =>
{
if (m.Equals("start"))
{
AsyncTask().PipeTo(Sender);
}
else if (m.Equals("run"))
{
RunTask(() => AsyncTask().PipeTo(Sender));
}
});
}

private async Task<String> AsyncTask()
{
try
{
var sender1 = Context.Sender;
await Task.Delay(1);
var sender2 = Context.Sender;
if (sender1 != sender2)
return "error";
}
catch (NotSupportedException)
{
return "notsupported";
}
catch(NullReferenceException)
{
return "notsupported";
}
return "done";
}
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
[Fact]
public async Task Actors_should_not_be_able_to_use_context_in_async_task()
{
var actor = Sys.ActorOf(Props.Create<AsyncContextSupportForTaskActor>());
actor.Tell("start");
ExpectMsg("notsupported", TimeSpan.FromSeconds(5));

actor.Tell("run");
ExpectMsg("done", TimeSpan.FromSeconds(5));
}

[Fact]
public async Task Actors_should_be_able_to_start_longrunning_task()
{
var actor = Sys.ActorOf(Props.Create<AsyncLongRunningActor>());
var task = actor.Ask<string>("start", TimeSpan.FromSeconds(5));
actor.Tell("stop", ActorRefs.NoSender);
var res = await task;
Assert.Equal("done", res);
}

[Fact]
public async Task UntypedActors_should_be_able_to_async_await_ask_message_loop()
{
Expand Down
14 changes: 13 additions & 1 deletion src/core/Akka/Dispatch/ActorTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ protected override void QueueTask(Task task)
return;
}

//detached task types
if(task.CreationOptions.HasFlag(TaskCreationOptions.LongRunning)
//|| task.CreationOptions.HasFlag(TaskCreationOptions.AttachedToParent)
)
{
var worker = new Task(() => {
TryExecuteTask(task);
});
worker.Start(TaskScheduler.Default);
return;
}

//we get here if the task needs to be marshalled back to the mailbox
//e.g. if previous task was an IO completion
s = CallContext.LogicalGetData(StateKey) as AmbientState;
Expand All @@ -62,7 +74,7 @@ protected override void QueueTask(Task task)
{
SetCurrentState(s.Self,s.Sender,s.Message);
TryExecuteTask(task);
if (task.IsFaulted)
if (task.IsFaulted || task.IsCanceled)
Rethrow(task, null);

}), ActorRefs.NoSender);
Expand Down

1 comment on commit 6032af4

@Aaronontheweb
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah, shit, this set me and not @Zetanova as the author when I reset the timestamp so the build server could pick it up :(

Please sign in to comment.