Skip to content

Commit

Permalink
fixed the first set of issues related to #1544
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored and Danthar committed May 31, 2018
1 parent 82b92b0 commit 98bdbd7
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 49 deletions.
85 changes: 51 additions & 34 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public RemotingSpec(ITestOutputHelper helper) : base(GetConfig(), helper)

var conf = c2.WithFallback(c1); //ConfigurationFactory.ParseString(GetOtherRemoteSysConfig());

remoteSystem = ActorSystem.Create("remote-sys", conf);
Deploy(Sys, new Deploy(@"/gonk", new RemoteScope(Addr(remoteSystem, "tcp"))));
Deploy(Sys, new Deploy(@"/zagzag", new RemoteScope(Addr(remoteSystem, "udp"))));
_remoteSystem = ActorSystem.Create("remote-sys", conf);
Deploy(Sys, new Deploy(@"/gonk", new RemoteScope(Addr(_remoteSystem, "tcp"))));
Deploy(Sys, new Deploy(@"/zagzag", new RemoteScope(Addr(_remoteSystem, "udp"))));

remote = remoteSystem.ActorOf(Props.Create<Echo2>(), "echo");
here = Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/echo");
_remote = _remoteSystem.ActorOf(Props.Create<Echo2>(), "echo");
_here = Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/echo");
}

private static string GetConfig()
Expand Down Expand Up @@ -132,14 +132,14 @@ protected string GetOtherRemoteSysConfig()
}";
}

private ActorSystem remoteSystem;
private ICanTell remote;
private ICanTell here;
private readonly ActorSystem _remoteSystem;
private ICanTell _remote;
private readonly ICanTell _here;


protected override void AfterAll()
{
remoteSystem.Terminate();
Shutdown(_remoteSystem, RemainingOrDefault);
AssociationRegistry.Clear();
base.AfterAll();
}
Expand All @@ -151,7 +151,7 @@ protected override void AfterAll()
[Fact]
public void Remoting_must_support_remote_lookups()
{
here.Tell("ping", TestActor);
_here.Tell("ping", TestActor);
ExpectMsg(Tuple.Create("pong", TestActor), TimeSpan.FromSeconds(1.5));
}

Expand All @@ -161,7 +161,7 @@ public async Task Remoting_must_support_Ask()
//TODO: using smaller numbers for the cancellation here causes a bug.
//the remoting layer uses some "initialdelay task.delay" for 4 seconds.
//so the token is cancelled before the delay completed..
var msg = await here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5));
var msg = await _here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5));
Assert.Equal("pong", msg.Item1);
Assert.IsType<FutureActorRef>(msg.Item2);
}
Expand All @@ -172,19 +172,19 @@ public async Task Ask_does_not_deadlock()
// see https://github.com/akkadotnet/akka.net/issues/2546

// the configure await causes the continuation (== the second ask) to be scheduled on the HELIOS worker thread
var msg = await here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5)).ConfigureAwait(false);
var msg = await _here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5)).ConfigureAwait(false);
Assert.Equal("pong", msg.Item1);

// the .Result here blocks the helios worker thread, deadlocking the whole system.
var msg2 = here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5)).Result;
var msg2 = _here.Ask<Tuple<string, IActorRef>>("ping", TimeSpan.FromSeconds(1.5)).Result;
Assert.Equal("pong", msg2.Item1);
}

[Fact]
public void Resolve_does_not_deadlock()
{
// here is really an ActorSelection
var actorSelection = (ActorSelection)here;
var actorSelection = (ActorSelection)_here;
var actorRef = actorSelection.ResolveOne(TimeSpan.FromSeconds(10)).Result;
// the only test is that the ResolveOne works, so if we got here, the test passes
}
Expand All @@ -195,7 +195,7 @@ public void Resolve_does_not_deadlock_GuiApplication()
AsyncContext.Run(() =>
{
// here is really an ActorSelection
var actorSelection = (ActorSelection)here;
var actorSelection = (ActorSelection)_here;
var actorRef = actorSelection.ResolveOne(TimeSpan.FromSeconds(10)).Result;
// the only test is that the ResolveOne works, so if we got here, the test passes
return Task.Delay(0);
Expand All @@ -205,15 +205,15 @@ public void Resolve_does_not_deadlock_GuiApplication()
[Fact]
public void Remoting_must_not_send_remote_recreated_actor_with_same_name()
{
var echo = remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1");
var echo = _remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1");
echo.Tell(71);
ExpectMsg(71);
echo.Tell(PoisonPill.Instance);
ExpectMsg("postStop");
echo.Tell(72);
ExpectNoMsg(TimeSpan.FromSeconds(1));

var echo2 = remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1");
var echo2 = _remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1");
echo2.Tell(73);
ExpectMsg(73);

Expand All @@ -222,7 +222,7 @@ public void Remoting_must_not_send_remote_recreated_actor_with_same_name()
echo.Tell(74);
ExpectNoMsg(TimeSpan.FromSeconds(1));

remoteSystem.ActorSelection("/user/otherEcho1").Tell(75);
_remoteSystem.ActorSelection("/user/otherEcho1").Tell(75);
ExpectMsg(75);

Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/otherEcho1").Tell(76);
Expand Down Expand Up @@ -284,6 +284,23 @@ public void Remoting_must_lookup_actors_across_node_boundaries()
ExpectMsg(47);
}

//[Fact]
//public void Remoting_must_allow_ActorSelection_of_RemoteRoutees()
//{

// Within(TimeSpan.FromMinutes(4), () =>
// {
// var l = Sys.ActorOf(Props.Create<Echo1>().WithDeploy(new Deploy(new RemoteScope(RARP.For(_remoteSystem).Provider.DefaultAddress))), "deployed");

// l.Tell(new Identify("idReq1"));
// ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq1")).Subject.ShouldBe(l);

// Sys.ActorSelection(l.Path).Tell(new Identify("idReq2"));
// ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(l);
// });

//}

[Fact]
public void Remoting_must_select_actors_across_node_boundaries()
{
Expand Down Expand Up @@ -326,36 +343,36 @@ public void Remoting_must_select_actors_across_node_boundaries()
Sys.ActorSelection("/user/looker/child").Tell(new Identify("idReq1"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq1")).Subject.ShouldBe(child);
//TODO see #1544
//Sys.ActorSelection(child.Path).Tell(new Identify("idReq2"));
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(child);
Sys.ActorSelection(child.Path).Tell(new Identify("idReq2"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(child);
Sys.ActorSelection("/user/looker/*").Tell(new Identify("idReq3"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq3")).Subject.ShouldBe(child);

Sys.ActorSelection("/user/looker/child/grandchild").Tell(new Identify("idReq4"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq4")).Subject.ShouldBe(grandchild);
//TODO see #1544
//Sys.ActorSelection(child.Path / "grandchild").Tell(new Identify("idReq5"));
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq5")).Subject.ShouldBe(grandchild);
Sys.ActorSelection(child.Path / "grandchild").Tell(new Identify("idReq5"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq5")).Subject.ShouldBe(grandchild);
Sys.ActorSelection("/user/looker/*/grandchild").Tell(new Identify("idReq6"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq6")).Subject.ShouldBe(grandchild);
Sys.ActorSelection("/user/looker/child/*").Tell(new Identify("idReq7"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq7")).Subject.ShouldBe(grandchild);
//TODO see #1544
//Sys.ActorSelection(child.Path / "*").Tell(new Identify("idReq8"));
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq8")).Subject.ShouldBe(grandchild);
Sys.ActorSelection(child.Path / "*").Tell(new Identify("idReq8"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq8")).Subject.ShouldBe(grandchild);

Sys.ActorSelection("/user/looker/child/grandchild/grandgrandchild").Tell(new Identify("idReq9"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq9")).Subject.ShouldBe(grandgrandchild);
//TODO see #1544
//Sys.ActorSelection(child.Path / "grandchild/grandgrandchild").Tell(new Identify("idReq10"));
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq10")).Subject.ShouldBe(grandgrandchild);
Sys.ActorSelection(child.Path / "grandchild/grandgrandchild").Tell(new Identify("idReq10"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq10"), TimeSpan.FromMinutes(4)).Subject.ShouldBe(grandgrandchild);
Sys.ActorSelection("/user/looker/child/*/grandgrandchild").Tell(new Identify("idReq11"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq11")).Subject.ShouldBe(grandgrandchild);
Sys.ActorSelection("/user/looker/child/*/*").Tell(new Identify("idReq12"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq12")).Subject.ShouldBe(grandgrandchild);
//TODO see #1544
//Sys.ActorSelection(child.Path / "*/grandgrandchild").Tell(new Identify("idReq13"));
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq13")).Subject.ShouldBe(grandgrandchild);
Sys.ActorSelection(child.Path / "*/grandgrandchild").Tell(new Identify("idReq13"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq13")).Subject.ShouldBe(grandgrandchild);

//ActorSelection doesn't support ToSerializationFormat directly
//var sel1 = Sys.ActorSelection("/user/looker/child/grandchild/grandgrandchild");
Expand All @@ -373,8 +390,8 @@ public void Remoting_must_select_actors_across_node_boundaries()
child2.Tell(new Identify("idReq15"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq15")).Subject.ShouldBe(child2);
//TODO see #1544
//Sys.ActorSelection(child.Path).Tell("idReq16");
//ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq16")).Subject.ShouldBe(child2);
Sys.ActorSelection(child.Path).Tell("idReq16");
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq16")).Subject.ShouldBe(child2);
child.Tell(new Identify("idReq17"));
ExpectMsg<ActorIdentity>(i => i.MessageId.Equals("idReq17")).Subject.ShouldBe(null);

Expand Down Expand Up @@ -435,7 +452,7 @@ public async Task Bug_884_Remoting_must_support_reply_to_Routee()

//have one of the routees send the message
var targetRoutee = routees.Members.Cast<ActorRefRoutee>().Select(x => x.Actor).First();
here.Tell("ping", targetRoutee);
_here.Tell("ping", targetRoutee);
var msg = ExpectMsg<Tuple<string, IActorRef>>(TimeSpan.FromSeconds(1.5));
Assert.Equal("pong", msg.Item1);
Assert.Equal(targetRoutee, msg.Item2);
Expand All @@ -451,7 +468,7 @@ public async Task Bug_884_Remoting_must_support_reply_to_child_of_Routee()
//have one of the routees send the message
var targetRoutee = routees.Members.Cast<ActorRefRoutee>().Select(x => x.Actor).First();
var reporter = await targetRoutee.Ask<IActorRef>(new NestedDeployer.GetNestedReporter());
here.Tell("ping", reporter);
_here.Tell("ping", reporter);
var msg = ExpectMsg<Tuple<string, IActorRef>>(TimeSpan.FromSeconds(1.5));
Assert.Equal("pong", msg.Item1);
Assert.Equal(reporter, msg.Item2);
Expand Down Expand Up @@ -485,7 +502,7 @@ public void Drop_received_messages_over_payload_size()
[Fact]
public void Nobody_should_be_converted_back_to_its_singleton()
{
here.Tell(ActorRefs.Nobody, TestActor);
_here.Tell(ActorRefs.Nobody, TestActor);
ExpectMsg(ActorRefs.Nobody, TimeSpan.FromSeconds(1.5));
}

Expand Down Expand Up @@ -537,7 +554,7 @@ private static ByteString ByteStringOfSize(int size)
private void VerifySend(object msg, Action afterSend)
{
var bigBounceId = $"bigBounce-{ThreadLocalRandom.Current.Next()}";
var bigBounceOther = remoteSystem.ActorOf(Props.Create<Bouncer>().WithDeploy(Actor.Deploy.Local),
var bigBounceOther = _remoteSystem.ActorOf(Props.Create<Bouncer>().WithDeploy(Actor.Deploy.Local),
bigBounceId);

var bigBounceHere =
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ internal class DefaultMessageDispatcher : IInboundMessageDispatcher
/// <param name="log">TBD</param>
public DefaultMessageDispatcher(ActorSystem system, IRemoteActorRefProvider provider, ILoggingAdapter log)
{
this._system = system;
this._provider = provider;
this._log = log;
_system = system;
_provider = provider;
_log = log;
_remoteDaemon = provider.RemoteDaemon;
_settings = provider.RemoteSettings;
}
Expand Down
62 changes: 52 additions & 10 deletions src/core/Akka.Remote/RemoteSystemDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Akka.Event;
using Akka.Util;
using Akka.Util.Internal;
using Akka.Util.Internal.Collections;

namespace Akka.Remote
{
Expand Down Expand Up @@ -124,7 +125,53 @@ protected override void TellInternal(object message, IActorRef sender)
Log.Debug("Received command [{0}] to RemoteSystemDaemon on [{1}]", message, Path.Address);
if (message is DaemonMsgCreate) HandleDaemonMsgCreate((DaemonMsgCreate)message);
}
else if (message is ActorSelectionMessage sel)
{
var iter = sel.Elements.Iterator();

Tuple<IEnumerable<string>, object> Rec(IImmutableList<string> acc)
{
while (true)
{
if (iter.IsEmpty())
return Tuple.Create(acc.Reverse(), sel.Message);

// find child elements, and the message to send, which is a remaining ActorSelectionMessage
// in case of SelectChildPattern, otherwise the actual message of the selection
switch (iter.Next())
{
case SelectChildName n:
acc = ImmutableList.Create(n.Name).AddRange(acc);
continue;
case SelectParent p when !acc.Any():
continue;
case SelectParent p:
acc = acc.Skip(1).ToImmutableList();
continue;
case SelectChildPattern pat:
return Tuple.Create<IEnumerable<string>, object>(acc.Reverse(), sel.Copy(elements: new[] { pat }.Concat(iter.ToVector()).ToArray()));
default: // compiler ceremony - should never be hit
throw new InvalidOperationException("Unknown ActorSelectionPart []");
}
}
}

var t = Rec(ImmutableList<string>.Empty);
var concatenatedChildNames = t.Item1;
var m = t.Item2;

var child = GetChild(concatenatedChildNames);
if (child.IsNobody())
{
var emptyRef = new EmptyLocalActorRef(_system.Provider,
Path / sel.Elements.Select(el => el.ToString()), _system.EventStream);
emptyRef.Tell(sel, sender);
}
else
{
child.Tell(m, sender);
}
}
//Remote ActorSystem on another process / machine has died.
//Need to clean up any references to remote deployments here.
else if (message is AddressTerminated)
Expand All @@ -136,9 +183,8 @@ protected override void TellInternal(object message, IActorRef sender)
if (@ref.Parent.Path.Address == addressTerminated.Address) _system.Stop(@ref);
});
}
else if (message is Identify)
else if (message is Identify identify)
{
var identify = message as Identify;
sender.Tell(new ActorIdentity(identify.MessageId, this));
}
else if (message is TerminationHook)
Expand All @@ -157,11 +203,9 @@ protected override void TellInternal(object message, IActorRef sender)
/// <param name="message">TBD</param>
public override void SendSystemMessage(ISystemMessage message)
{
if (message is DeathWatchNotification)
if (message is DeathWatchNotification deathWatchNotification)
{
var deathWatchNotification = message as DeathWatchNotification;
var child = deathWatchNotification.Actor as ActorRefWithCell;
if (child != null)
if (deathWatchNotification.Actor is ActorRefWithCell child)
{
if (child.IsLocal)
{
Expand All @@ -183,13 +227,11 @@ public override void SendSystemMessage(ISystemMessage message)
else
{
var parent = deathWatchNotification.Actor;
var parentWithScope = parent as IActorRefScope;
if (parentWithScope != null && !parentWithScope.IsLocal)
if (parent is IActorRefScope parentWithScope && !parentWithScope.IsLocal)
{
_terminating.Locked(() =>
{
IImmutableSet<IActorRef> children;
if (_parent2Children.TryRemove(parent, out children))
if (_parent2Children.TryRemove(parent, out var children))
{
foreach (var c in children)
{
Expand Down
Loading

0 comments on commit 98bdbd7

Please sign in to comment.