Skip to content

Commit

Permalink
Revert "Ask should push unhandled answers into deadletter (#5221)" (#…
Browse files Browse the repository at this point in the history
…5254)

This reverts commit cac54cb.
  • Loading branch information
Aaronontheweb authored Sep 6, 2021
1 parent f09a227 commit be057ab
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,31 +414,29 @@ private static async Task<object> AskEx(ICanTell self, Func<IActorRef, object> m

CancellationTokenSource timeoutCancellation = null;
timeout = timeout ?? provider.Settings.AskTimeout;

CancellationTokenRegistration? ctr1 = null;
CancellationTokenRegistration? ctr2 = null;
var ctrList = new List<CancellationTokenRegistration>(2);

if (timeout != Timeout.InfiniteTimeSpan && timeout.Value > default(TimeSpan))
{
timeoutCancellation = new CancellationTokenSource();

ctr1 = timeoutCancellation.Token.Register(() =>
ctrList.Add(timeoutCancellation.Token.Register(() =>
{
result.TrySetException(new AskTimeoutException($"Timeout after {timeout} seconds"));
});
}));

timeoutCancellation.CancelAfter(timeout.Value);
}

if (cancellationToken.CanBeCanceled)
{
ctr2 = cancellationToken.Register(() => result.TrySetCanceled());
ctrList.Add(cancellationToken.Register(() => result.TrySetCanceled()));
}

//create a new tempcontainer path
var path = provider.TempPath();
ActorPath path = provider.TempPath();

var future = new FutureActorRef<object>(result, path, provider);
var future = new FutureActorRef<object>(result, t => { }, path);
//The future actor needs to be registered in the temp container
provider.RegisterTempActor(future, path);

Expand All @@ -454,9 +452,15 @@ private static async Task<object> AskEx(ICanTell self, Func<IActorRef, object> m

provider.UnregisterTempActor(path);

ctr1?.Dispose();
ctr2?.Dispose();
timeoutCancellation?.Dispose();
for (var i = 0; i < ctrList.Count; i++)
{
ctrList[i].Dispose();
}

if (timeoutCancellation != null)
{
timeoutCancellation.Dispose();
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -862,11 +862,12 @@ namespace Akka.Actor
public Failures() { }
public System.Collections.Generic.List<Akka.Actor.Failure> Entries { get; }
}
public sealed class FutureActorRef<T> : Akka.Actor.MinimalActorRef
public class FutureActorRef<T> : Akka.Actor.MinimalActorRef
{
public FutureActorRef(System.Threading.Tasks.TaskCompletionSource<T> result, Akka.Actor.ActorPath path, Akka.Actor.IActorRefProvider provider) { }
public FutureActorRef(System.Threading.Tasks.TaskCompletionSource<T> result, System.Action<System.Threading.Tasks.Task> unregister, Akka.Actor.ActorPath path) { }
public override Akka.Actor.ActorPath Path { get; }
public override Akka.Actor.IActorRefProvider Provider { get; }
public override void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage message) { }
protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { }
}
public class static Futures
Expand Down
64 changes: 0 additions & 64 deletions src/core/Akka.Tests/Actor/AskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,6 @@ protected override void OnReceive(object message)
{
Sender.Tell("answer");
}

if (message.Equals("delay"))
{
Thread.Sleep(3000);
Sender.Tell("answer");
}

if (message.Equals("many"))
{
Sender.Tell("answer1");
Sender.Tell("answer2");
Sender.Tell("answer2");
}

if (message.Equals("invalid"))
{
Sender.Tell(123);
}
}
}

Expand Down Expand Up @@ -132,52 +114,6 @@ public async Task Can_get_timeout_when_asking_actor()
await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("timeout", TimeSpan.FromSeconds(3)));
}

[Fact]
public async Task Ask_should_put_timeout_answer_into_deadletter()
{
var actor = Sys.ActorOf<SomeActor>();

await EventFilter.DeadLetter<object>().ExpectOneAsync(TimeSpan.FromSeconds(5), async () =>
{
await Assert.ThrowsAsync<AskTimeoutException>(async () => await actor.Ask<string>("delay", TimeSpan.FromSeconds(1)));
});
}

[Fact]
public async Task Ask_should_put_too_many_answers_into_deadletter()
{
var actor = Sys.ActorOf<SomeActor>();

await EventFilter.DeadLetter<object>().ExpectAsync(2, async () =>
{
var result = await actor.Ask<string>("many", TimeSpan.FromSeconds(1));
result.ShouldBe("answer1");
});
}

[Fact]
public async Task Ask_should_not_put_canceled_answer_into_deadletter()
{
var actor = Sys.ActorOf<SomeActor>();

await EventFilter.DeadLetter<object>().ExpectAsync(0, async () =>
{
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)))
await Assert.ThrowsAsync<TaskCanceledException>(async () => await actor.Ask<string>("delay", Timeout.InfiniteTimeSpan, cts.Token));
});
}

[Fact]
public async Task Ask_should_put_invalid_answer_into_deadletter()
{
var actor = Sys.ActorOf<SomeActor>();

await EventFilter.DeadLetter<object>().ExpectOne(async () =>
{
await Assert.ThrowsAsync<ArgumentException>(async () => await actor.Ask<string>("invalid", TimeSpan.FromSeconds(1)));
});
}

[Fact]
public async Task Can_cancel_when_asking_actor()
{
Expand Down
113 changes: 68 additions & 45 deletions src/core/Akka/Actor/ActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,51 @@ public interface IRepointableRef : IActorRefScope
///
/// ActorRef implementation used for one-off tasks.
/// </summary>
public sealed class FutureActorRef<T> : MinimalActorRef
public class FutureActorRef<T> : MinimalActorRef
{
private readonly TaskCompletionSource<T> _result;
private readonly ActorPath _path;
private readonly IActorRefProvider _provider;

/// <summary>
/// INTERNAL API
/// </summary>
/// <param name="result">TBD</param>
/// <param name="unregister">TBD</param>
/// <param name="path">TBD</param>
/// <param name="provider">TBD</param>
public FutureActorRef(TaskCompletionSource<T> result, ActorPath path, IActorRefProvider provider)
public FutureActorRef(TaskCompletionSource<T> result, Action<Task> unregister, ActorPath path)
{
if (ActorCell.Current != null)
{
_actorAwaitingResultSender = ActorCell.Current.Sender;
}
_result = result;
_path = path;
_provider = provider;

_result.Task.ContinueWith(unregister);
}

/// <summary>
/// TBD
/// </summary>
public override ActorPath Path => _path;
public override ActorPath Path
{
get { return _path; }
}

/// <summary>
/// TBD
/// </summary>
public override IActorRefProvider Provider => _provider;
/// <exception cref="System.NotImplementedException">TBD</exception>
public override IActorRefProvider Provider
{
get { throw new NotImplementedException(); }
}


private const int INITIATED = 0;
private const int COMPLETED = 1;
private int status = INITIATED;
private readonly IActorRef _actorAwaitingResultSender;

/// <summary>
/// TBD
Expand All @@ -105,37 +122,43 @@ public FutureActorRef(TaskCompletionSource<T> result, ActorPath path, IActorRefP
/// <param name="sender">TBD</param>
protected override void TellInternal(object message, IActorRef sender)
{
var handled = false;

switch (message)
if (message is ISystemMessage sysM) //we have special handling for system messages
{
case ISystemMessage sysM:
SendSystemMessage(sysM); //we have special handling for system messages
handled = true;
break;
case T t:
handled = _result.TrySetResult(t);
break;
case null:
handled = _result.TrySetResult(default);
break;
case Status.Failure f:
handled = _result.TrySetException(f.Cause
?? new TaskCanceledException("Task cancelled by actor via Failure message."));
break;
case Failure f:
handled = _result.TrySetException(f.Exception
?? new TaskCanceledException("Task cancelled by actor via Failure message."));
break;
default:
_ = _result.TrySetException(new ArgumentException(
$"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]"));
break;
SendSystemMessage(sysM);
}
else
{
if (Interlocked.Exchange(ref status, COMPLETED) == INITIATED)
{
if (message is T t)
{
_result.TrySetResult(t);
}
else if (message == null) //special case: https://github.com/akkadotnet/akka.net/issues/5204
{
_result.TrySetResult(default);
}
else if (message is Failure f)
{
_result.TrySetException(f.Exception ?? new TaskCanceledException("Task cancelled by actor via Failure message."));
}
else
{
_result.TrySetException(new ArgumentException(
$"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]"));
}
}
}

//ignore canceled ask and put unhandled answers into deadletter
if (!handled && !_result.Task.IsCanceled)
_provider.DeadLetters.Tell(message ?? default(T), this);
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
public override void SendSystemMessage(ISystemMessage message)
{
base.SendSystemMessage(message);
}
}

Expand Down Expand Up @@ -704,16 +727,16 @@ public abstract class ActorRefWithCell : InternalActorRefBase
private IEnumerable<IActorRef> SelfAndChildren()
{
yield return this;
foreach (var child in Children.SelectMany(x =>
{
switch (x)
{
case ActorRefWithCell cell:
return cell.SelfAndChildren();
default:
return new[] { x };
}
}))
foreach(var child in Children.SelectMany(x =>
{
switch(x)
{
case ActorRefWithCell cell:
return cell.SelfAndChildren();
default:
return new[] { x };
}
}))
{
yield return child;
}
Expand Down
9 changes: 3 additions & 6 deletions src/core/Akka/Actor/Futures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,16 @@ public static Task<T> Ask<T>(this ICanTell self, Func<IActorRef, object> message
}

//create a new tempcontainer path
var path = provider.TempPath();
ActorPath path = provider.TempPath();

var future = new FutureActorRef<T>(result, path, provider);

//The future actor needs to be unregistered in the temp container
_ = result.Task.ContinueWith(t =>
var future = new FutureActorRef<T>(result, t =>
{
provider.UnregisterTempActor(path);
ctr1?.Dispose();
ctr2?.Dispose();
timeoutCancellation?.Dispose();
}, TaskContinuationOptions.ExecuteSynchronously);
}, path);

//The future actor needs to be registered in the temp container
provider.RegisterTempActor(future, path);
Expand Down

0 comments on commit be057ab

Please sign in to comment.