Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask interface should be clean #3220

Merged
merged 12 commits into from
Jan 30, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,13 @@ public static Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messa
return self.AskEx<T>(messageFactory, null, cancellationToken);
}

public static Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messageFactory, TimeSpan? timeout, CancellationToken cancellationToken)
public static async Task<T> AskEx<T>(this ICanTell self, Func<IActorRef, object> messageFactory, TimeSpan? timeout, CancellationToken cancellationToken)
{
IActorRefProvider provider = ResolveProvider(self);
if (provider == null)
throw new ArgumentException("Unable to resolve the target Provider", nameof(self));

return AskEx(self, messageFactory, provider, timeout, cancellationToken).CastTask<object, T>();
return (T)await AskEx(self, messageFactory, provider, timeout, cancellationToken);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this CastTask at all. All we need to do here is to take await and cast the value from object to T.

The rest will be managed by the .net itself. All the exceptions will be propagated as is.

}
internal static IActorRefProvider ResolveProvider(ICanTell self)
{
Expand All @@ -410,49 +410,60 @@ internal static IActorRefProvider ResolveProvider(ICanTell self)
return null;
}

private static Task<object> AskEx(ICanTell self, Func<IActorRef, object> messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken)
private static async Task<object> AskEx(ICanTell self, Func<IActorRef, object> messageFactory, IActorRefProvider provider, TimeSpan? timeout, CancellationToken cancellationToken)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this method I did a few changes:

  1. Setting the right exception in case of Timeout
  2. With the help of the async/await - removing one level of indirection(unregister callback is gone) - this logic happens in the try/finaly
  3. Right order of disposing things in the finally block

{
var result = new TaskCompletionSource<object>();

CancellationTokenSource timeoutCancellation = null;
timeout = timeout ?? provider.Settings.AskTimeout;
List<CancellationTokenRegistration> ctrList = new List<CancellationTokenRegistration>(2);
var ctrList = new List<CancellationTokenRegistration>(2);

if (timeout != System.Threading.Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan))
if (timeout != Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan))
{
timeoutCancellation = new CancellationTokenSource();
ctrList.Add(timeoutCancellation.Token.Register(() => result.TrySetCanceled()));

ctrList.Add(timeoutCancellation.Token.Register(() =>
{
result.TrySetException(new AskTimeoutException($"Timeout after {timeout} seconds"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this much better. Makes more sense than what we'd been doing historically since the Task isn't actually being cancelled, as you point out. It's timing out. It's a bit of an API change but I think an improvement on the clarity of what's going on internally inside Ask.

}));

timeoutCancellation.CancelAfter(timeout.Value);
}

if (cancellationToken.CanBeCanceled)
{
ctrList.Add(cancellationToken.Register(() => result.TrySetCanceled()));
}

//create a new tempcontainer path
ActorPath path = provider.TempPath();
//callback to unregister from tempcontainer
Action unregister =
() =>
{
// cancelling timeout (if any) in order to prevent memory leaks
// (a reference to 'result' variable in CancellationToken's callback)
if (timeoutCancellation != null)
{
timeoutCancellation.Cancel();
timeoutCancellation.Dispose();
}
for (var i = 0; i < ctrList.Count; i++)
{
ctrList[i].Dispose();
}
provider.UnregisterTempActor(path);
};

var future = new FutureActorRef(result, unregister, path);
var future = new FutureActorRef(result, () => { }, path);
//The future actor needs to be registered in the temp container
provider.RegisterTempActor(future, path);

self.Tell(messageFactory(future), future);
return result.Task;

try
{
return await result.Task;
}
finally
{
//callback to unregister from tempcontainer

provider.UnregisterTempActor(path);

for (var i = 0; i < ctrList.Count; i++)
{
ctrList[i].Dispose();
}

if (timeoutCancellation != null)
{
timeoutCancellation.Dispose();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4892,7 +4892,6 @@ namespace Akka.Util.Internal
[Akka.Annotations.InternalApiAttribute()]
public class static TaskExtensions
{
public static System.Threading.Tasks.Task<TResult> CastTask<TTask, TResult>(this System.Threading.Tasks.Task<TTask> task) { }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was internal.

public static System.Threading.Tasks.Task WithCancellation(this System.Threading.Tasks.Task task, System.Threading.CancellationToken cancellationToken) { }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,7 @@ public async Task Should_Ask_Clustered_Group_Router_and_with_no_routees_and_time
var router = Sys.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "router3");
Assert.IsType<RoutedActorRef>(router);

try
{
var result = await router.Ask<string>("foo");
}
catch (Exception ex)
{
Assert.IsType<TaskCanceledException>(ex);
}
await Assert.ThrowsAsync<AskTimeoutException>(async () => await router.Ask<int>("foo"));
}
}
}
108 changes: 85 additions & 23 deletions src/core/Akka.Tests/Actor/AskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
using Akka.Actor;
using System;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

namespace Akka.Tests.Actor
{

public class AskSpec : AkkaSpec
{
public AskSpec()
: base(@"akka.actor.ask-timeout = 3000ms")
{ }

public class SomeActor : UntypedActor
{
protected override void OnReceive(object message)
Expand All @@ -24,6 +29,7 @@ protected override void OnReceive(object message)
{
Thread.Sleep(5000);
}

if (message.Equals("answer"))
{
Sender.Tell("answer");
Expand All @@ -39,9 +45,9 @@ public WaitActor(IActorRef replyActor, IActorRef testActor)
_testActor = testActor;
}

private IActorRef _replyActor;
private readonly IActorRef _replyActor;

private IActorRef _testActor;
private readonly IActorRef _testActor;

protected override void OnReceive(object message)
{
Expand All @@ -66,52 +72,108 @@ protected override void OnReceive(object message)
}

[Fact]
public void Can_Ask_actor()
public async Task Can_Ask_actor()
{
var actor = Sys.ActorOf<SomeActor>();
actor.Ask<string>("answer").Result.ShouldBe("answer");
var res = await actor.Ask<string>("answer");
res.ShouldBe("answer");
}

[Fact]
public void Can_Ask_actor_with_timeout()
public async Task Can_Ask_actor_with_timeout()
{
var actor = Sys.ActorOf<SomeActor>();
actor.Ask<string>("answer",TimeSpan.FromSeconds(10)).Result.ShouldBe("answer");
var res = await actor.Ask<string>("answer", TimeSpan.FromSeconds(10));
res.ShouldBe("answer");
}

[Fact]
public void Can_get_timeout_when_asking_actor()
public async Task Can_get_timeout_when_asking_actor()
{
var actor = Sys.ActorOf<SomeActor>();
Assert.Throws<AggregateException>(() => { actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)).Wait(); });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here because we didn't use the async await syntax - we started to catch AggregateException and never actually checked the underneath type. And this probably caused us picking the wrong type of exception to throw(TaskCancelledException instead of AskTimeoutException)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Look at the code above. It's not actually "to be modern", it's just to help not to hide things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)));
}

[Fact]
public void Can_cancel_when_asking_actor()
{
public async Task Can_cancel_when_asking_actor()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("timeout", Timeout.InfiniteTimeSpan, cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to test CancellationToken source here - it working just fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("timeout", Timeout.InfiniteTimeSpan, cts.Token));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No mess with the AggregateExceptions and unwrapping... clear well defined expectation!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

}
}

[Fact]
public void Cancelled_ask_with_null_timeout_should_remove_temp_actor()
public async Task Ask_should_honor_config_specified_timeout()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("cancel", cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
try
{
await actor.Ask<string>("timeout");
Assert.True(false, "the ask should have timed out with default timeout");
}
catch (AskTimeoutException e)
{
Assert.Equal("Timeout after 00:00:03 seconds", e.Message);
}
}

[Fact]
public async Task Cancelled_ask_with_null_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();

using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("cancel", cts.Token));
}

Are_Temp_Actors_Removed(actor);
}

[Fact]
public void Cancelled_ask_with_timeout_should_remove_temp_actor()
public async Task Cancelled_ask_with_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Assert.Throws<AggregateException>(() => { actor.Ask<string>("cancel", TimeSpan.FromSeconds(30), cts.Token).Wait(); });
Assert.True(cts.IsCancellationRequested);
using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
{
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("cancel", TimeSpan.FromSeconds(30), cts.Token));
}

Are_Temp_Actors_Removed(actor);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb @Horusiath What is this thing doing here?
We kinda testing Ask here. As far as I can see it should not affect an actor it's used upon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally in order to work Ask makes a lightweight temporary actor reference. One of the bugs in the past was that those temporary actors where not recycled after Ask task completed, causing memory leaks.

}

[Fact]
public async Task AskTimeout_with_default_timeout_should_remove_temp_actor()
{
var actor = Sys.ActorOf<SomeActor>();

await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout"));

Are_Temp_Actors_Removed(actor);
}

[Fact]
public async Task ShouldFailWhenAskExpectsWrongType()
{
var actor = Sys.ActorOf<SomeActor>();

// expect int, but in fact string
await Assert.ThrowsAsync<InvalidCastException>(async () => await actor.Ask<int>("answer"));
}

[Fact]
public void AskDoesNotDeadlockWhenWaitForResultInGuiApplication()
{
AsyncContext.Run(() =>
{
var actor = Sys.ActorOf<SomeActor>();
var res = actor.Ask<string>("answer").Result; // blocking on purpose
res.ShouldBe("answer");
});
}

private void Are_Temp_Actors_Removed(IActorRef actor)
{
var actorCell = actor as ActorRefWithCell;
Expand All @@ -126,7 +188,7 @@ private void Are_Temp_Actors_Removed(IActorRef actor)
container.ForEachChild(x => childCounter++);
Assert.True(childCounter == 0, "Temp actors not all removed.");
});

}

/// <summary>
Expand Down
82 changes: 0 additions & 82 deletions src/core/Akka.Tests/Actor/AskTimeoutSpec.cs

This file was deleted.

Loading