diff --git a/src/core/Akka.Remote.Tests/RemotingSpec.cs b/src/core/Akka.Remote.Tests/RemotingSpec.cs index 6aaec76c4ff..75c0e4eecbf 100644 --- a/src/core/Akka.Remote.Tests/RemotingSpec.cs +++ b/src/core/Akka.Remote.Tests/RemotingSpec.cs @@ -34,12 +34,12 @@ public RemotingSpec(ITestOutputHelper helper) : base(GetConfig(), helper) var conf = c2.WithFallback(c1); //ConfigurationFactory.ParseString(GetOtherRemoteSysConfig()); - remoteSystem = ActorSystem.Create("remote-sys", conf); - Deploy(Sys, new Deploy(@"/gonk", new RemoteScope(Addr(remoteSystem, "tcp")))); - Deploy(Sys, new Deploy(@"/zagzag", new RemoteScope(Addr(remoteSystem, "udp")))); + _remoteSystem = ActorSystem.Create("remote-sys", conf); + Deploy(Sys, new Deploy(@"/gonk", new RemoteScope(Addr(_remoteSystem, "tcp")))); + Deploy(Sys, new Deploy(@"/zagzag", new RemoteScope(Addr(_remoteSystem, "udp")))); - remote = remoteSystem.ActorOf(Props.Create(), "echo"); - here = Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/echo"); + _remote = _remoteSystem.ActorOf(Props.Create(), "echo"); + _here = Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/echo"); } private static string GetConfig() @@ -132,14 +132,14 @@ protected string GetOtherRemoteSysConfig() }"; } - private ActorSystem remoteSystem; - private ICanTell remote; - private ICanTell here; + private readonly ActorSystem _remoteSystem; + private ICanTell _remote; + private readonly ICanTell _here; protected override void AfterAll() { - remoteSystem.Terminate(); + Shutdown(_remoteSystem, RemainingOrDefault); AssociationRegistry.Clear(); base.AfterAll(); } @@ -151,7 +151,7 @@ protected override void AfterAll() [Fact] public void Remoting_must_support_remote_lookups() { - here.Tell("ping", TestActor); + _here.Tell("ping", TestActor); ExpectMsg(Tuple.Create("pong", TestActor), TimeSpan.FromSeconds(1.5)); } @@ -161,7 +161,7 @@ public async Task Remoting_must_support_Ask() //TODO: using smaller numbers for the cancellation here causes a bug. //the remoting layer uses some "initialdelay task.delay" for 4 seconds. //so the token is cancelled before the delay completed.. - var msg = await here.Ask>("ping", TimeSpan.FromSeconds(1.5)); + var msg = await _here.Ask>("ping", TimeSpan.FromSeconds(1.5)); Assert.Equal("pong", msg.Item1); Assert.IsType(msg.Item2); } @@ -172,11 +172,11 @@ public async Task Ask_does_not_deadlock() // see https://github.com/akkadotnet/akka.net/issues/2546 // the configure await causes the continuation (== the second ask) to be scheduled on the HELIOS worker thread - var msg = await here.Ask>("ping", TimeSpan.FromSeconds(1.5)).ConfigureAwait(false); + var msg = await _here.Ask>("ping", TimeSpan.FromSeconds(1.5)).ConfigureAwait(false); Assert.Equal("pong", msg.Item1); // the .Result here blocks the helios worker thread, deadlocking the whole system. - var msg2 = here.Ask>("ping", TimeSpan.FromSeconds(1.5)).Result; + var msg2 = _here.Ask>("ping", TimeSpan.FromSeconds(1.5)).Result; Assert.Equal("pong", msg2.Item1); } @@ -184,7 +184,7 @@ public async Task Ask_does_not_deadlock() public void Resolve_does_not_deadlock() { // here is really an ActorSelection - var actorSelection = (ActorSelection)here; + var actorSelection = (ActorSelection)_here; var actorRef = actorSelection.ResolveOne(TimeSpan.FromSeconds(10)).Result; // the only test is that the ResolveOne works, so if we got here, the test passes } @@ -195,7 +195,7 @@ public void Resolve_does_not_deadlock_GuiApplication() AsyncContext.Run(() => { // here is really an ActorSelection - var actorSelection = (ActorSelection)here; + var actorSelection = (ActorSelection)_here; var actorRef = actorSelection.ResolveOne(TimeSpan.FromSeconds(10)).Result; // the only test is that the ResolveOne works, so if we got here, the test passes return Task.Delay(0); @@ -205,7 +205,7 @@ public void Resolve_does_not_deadlock_GuiApplication() [Fact] public void Remoting_must_not_send_remote_recreated_actor_with_same_name() { - var echo = remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1"); + var echo = _remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1"); echo.Tell(71); ExpectMsg(71); echo.Tell(PoisonPill.Instance); @@ -213,7 +213,7 @@ public void Remoting_must_not_send_remote_recreated_actor_with_same_name() echo.Tell(72); ExpectNoMsg(TimeSpan.FromSeconds(1)); - var echo2 = remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1"); + var echo2 = _remoteSystem.ActorOf(Props.Create(() => new Echo1()), "otherEcho1"); echo2.Tell(73); ExpectMsg(73); @@ -222,7 +222,7 @@ public void Remoting_must_not_send_remote_recreated_actor_with_same_name() echo.Tell(74); ExpectNoMsg(TimeSpan.FromSeconds(1)); - remoteSystem.ActorSelection("/user/otherEcho1").Tell(75); + _remoteSystem.ActorSelection("/user/otherEcho1").Tell(75); ExpectMsg(75); Sys.ActorSelection("akka.test://remote-sys@localhost:12346/user/otherEcho1").Tell(76); @@ -284,6 +284,23 @@ public void Remoting_must_lookup_actors_across_node_boundaries() ExpectMsg(47); } + //[Fact] + //public void Remoting_must_allow_ActorSelection_of_RemoteRoutees() + //{ + + // Within(TimeSpan.FromMinutes(4), () => + // { + // var l = Sys.ActorOf(Props.Create().WithDeploy(new Deploy(new RemoteScope(RARP.For(_remoteSystem).Provider.DefaultAddress))), "deployed"); + + // l.Tell(new Identify("idReq1")); + // ExpectMsg(i => i.MessageId.Equals("idReq1")).Subject.ShouldBe(l); + + // Sys.ActorSelection(l.Path).Tell(new Identify("idReq2")); + // ExpectMsg(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(l); + // }); + + //} + [Fact] public void Remoting_must_select_actors_across_node_boundaries() { @@ -326,36 +343,36 @@ public void Remoting_must_select_actors_across_node_boundaries() Sys.ActorSelection("/user/looker/child").Tell(new Identify("idReq1")); ExpectMsg(i => i.MessageId.Equals("idReq1")).Subject.ShouldBe(child); //TODO see #1544 - //Sys.ActorSelection(child.Path).Tell(new Identify("idReq2")); - //ExpectMsg(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(child); + Sys.ActorSelection(child.Path).Tell(new Identify("idReq2")); + ExpectMsg(i => i.MessageId.Equals("idReq2")).Subject.ShouldBe(child); Sys.ActorSelection("/user/looker/*").Tell(new Identify("idReq3")); ExpectMsg(i => i.MessageId.Equals("idReq3")).Subject.ShouldBe(child); Sys.ActorSelection("/user/looker/child/grandchild").Tell(new Identify("idReq4")); ExpectMsg(i => i.MessageId.Equals("idReq4")).Subject.ShouldBe(grandchild); //TODO see #1544 - //Sys.ActorSelection(child.Path / "grandchild").Tell(new Identify("idReq5")); - //ExpectMsg(i => i.MessageId.Equals("idReq5")).Subject.ShouldBe(grandchild); + Sys.ActorSelection(child.Path / "grandchild").Tell(new Identify("idReq5")); + ExpectMsg(i => i.MessageId.Equals("idReq5")).Subject.ShouldBe(grandchild); Sys.ActorSelection("/user/looker/*/grandchild").Tell(new Identify("idReq6")); ExpectMsg(i => i.MessageId.Equals("idReq6")).Subject.ShouldBe(grandchild); Sys.ActorSelection("/user/looker/child/*").Tell(new Identify("idReq7")); ExpectMsg(i => i.MessageId.Equals("idReq7")).Subject.ShouldBe(grandchild); //TODO see #1544 - //Sys.ActorSelection(child.Path / "*").Tell(new Identify("idReq8")); - //ExpectMsg(i => i.MessageId.Equals("idReq8")).Subject.ShouldBe(grandchild); + Sys.ActorSelection(child.Path / "*").Tell(new Identify("idReq8")); + ExpectMsg(i => i.MessageId.Equals("idReq8")).Subject.ShouldBe(grandchild); Sys.ActorSelection("/user/looker/child/grandchild/grandgrandchild").Tell(new Identify("idReq9")); ExpectMsg(i => i.MessageId.Equals("idReq9")).Subject.ShouldBe(grandgrandchild); //TODO see #1544 - //Sys.ActorSelection(child.Path / "grandchild/grandgrandchild").Tell(new Identify("idReq10")); - //ExpectMsg(i => i.MessageId.Equals("idReq10")).Subject.ShouldBe(grandgrandchild); + Sys.ActorSelection(child.Path / "grandchild/grandgrandchild").Tell(new Identify("idReq10")); + ExpectMsg(i => i.MessageId.Equals("idReq10"), TimeSpan.FromMinutes(4)).Subject.ShouldBe(grandgrandchild); Sys.ActorSelection("/user/looker/child/*/grandgrandchild").Tell(new Identify("idReq11")); ExpectMsg(i => i.MessageId.Equals("idReq11")).Subject.ShouldBe(grandgrandchild); Sys.ActorSelection("/user/looker/child/*/*").Tell(new Identify("idReq12")); ExpectMsg(i => i.MessageId.Equals("idReq12")).Subject.ShouldBe(grandgrandchild); //TODO see #1544 - //Sys.ActorSelection(child.Path / "*/grandgrandchild").Tell(new Identify("idReq13")); - //ExpectMsg(i => i.MessageId.Equals("idReq13")).Subject.ShouldBe(grandgrandchild); + Sys.ActorSelection(child.Path / "*/grandgrandchild").Tell(new Identify("idReq13")); + ExpectMsg(i => i.MessageId.Equals("idReq13")).Subject.ShouldBe(grandgrandchild); //ActorSelection doesn't support ToSerializationFormat directly //var sel1 = Sys.ActorSelection("/user/looker/child/grandchild/grandgrandchild"); @@ -373,8 +390,8 @@ public void Remoting_must_select_actors_across_node_boundaries() child2.Tell(new Identify("idReq15")); ExpectMsg(i => i.MessageId.Equals("idReq15")).Subject.ShouldBe(child2); //TODO see #1544 - //Sys.ActorSelection(child.Path).Tell("idReq16"); - //ExpectMsg(i => i.MessageId.Equals("idReq16")).Subject.ShouldBe(child2); + Sys.ActorSelection(child.Path).Tell("idReq16"); + ExpectMsg(i => i.MessageId.Equals("idReq16")).Subject.ShouldBe(child2); child.Tell(new Identify("idReq17")); ExpectMsg(i => i.MessageId.Equals("idReq17")).Subject.ShouldBe(null); @@ -435,7 +452,7 @@ public async Task Bug_884_Remoting_must_support_reply_to_Routee() //have one of the routees send the message var targetRoutee = routees.Members.Cast().Select(x => x.Actor).First(); - here.Tell("ping", targetRoutee); + _here.Tell("ping", targetRoutee); var msg = ExpectMsg>(TimeSpan.FromSeconds(1.5)); Assert.Equal("pong", msg.Item1); Assert.Equal(targetRoutee, msg.Item2); @@ -451,7 +468,7 @@ public async Task Bug_884_Remoting_must_support_reply_to_child_of_Routee() //have one of the routees send the message var targetRoutee = routees.Members.Cast().Select(x => x.Actor).First(); var reporter = await targetRoutee.Ask(new NestedDeployer.GetNestedReporter()); - here.Tell("ping", reporter); + _here.Tell("ping", reporter); var msg = ExpectMsg>(TimeSpan.FromSeconds(1.5)); Assert.Equal("pong", msg.Item1); Assert.Equal(reporter, msg.Item2); @@ -485,7 +502,7 @@ public void Drop_received_messages_over_payload_size() [Fact] public void Nobody_should_be_converted_back_to_its_singleton() { - here.Tell(ActorRefs.Nobody, TestActor); + _here.Tell(ActorRefs.Nobody, TestActor); ExpectMsg(ActorRefs.Nobody, TimeSpan.FromSeconds(1.5)); } @@ -537,7 +554,7 @@ private static ByteString ByteStringOfSize(int size) private void VerifySend(object msg, Action afterSend) { var bigBounceId = $"bigBounce-{ThreadLocalRandom.Current.Next()}"; - var bigBounceOther = remoteSystem.ActorOf(Props.Create().WithDeploy(Actor.Deploy.Local), + var bigBounceOther = _remoteSystem.ActorOf(Props.Create().WithDeploy(Actor.Deploy.Local), bigBounceId); var bigBounceHere = diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index acd88fd2d81..5fe201b138e 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -64,9 +64,9 @@ internal class DefaultMessageDispatcher : IInboundMessageDispatcher /// TBD public DefaultMessageDispatcher(ActorSystem system, IRemoteActorRefProvider provider, ILoggingAdapter log) { - this._system = system; - this._provider = provider; - this._log = log; + _system = system; + _provider = provider; + _log = log; _remoteDaemon = provider.RemoteDaemon; _settings = provider.RemoteSettings; } diff --git a/src/core/Akka.Remote/RemoteSystemDaemon.cs b/src/core/Akka.Remote/RemoteSystemDaemon.cs index 27e007cdacb..fcbaac9f246 100644 --- a/src/core/Akka.Remote/RemoteSystemDaemon.cs +++ b/src/core/Akka.Remote/RemoteSystemDaemon.cs @@ -16,6 +16,7 @@ using Akka.Event; using Akka.Util; using Akka.Util.Internal; +using Akka.Util.Internal.Collections; namespace Akka.Remote { @@ -124,7 +125,53 @@ protected override void TellInternal(object message, IActorRef sender) Log.Debug("Received command [{0}] to RemoteSystemDaemon on [{1}]", message, Path.Address); if (message is DaemonMsgCreate) HandleDaemonMsgCreate((DaemonMsgCreate)message); } + else if (message is ActorSelectionMessage sel) + { + var iter = sel.Elements.Iterator(); + + Tuple, object> Rec(IImmutableList acc) + { + while (true) + { + if (iter.IsEmpty()) + return Tuple.Create(acc.Reverse(), sel.Message); + + // find child elements, and the message to send, which is a remaining ActorSelectionMessage + // in case of SelectChildPattern, otherwise the actual message of the selection + switch (iter.Next()) + { + case SelectChildName n: + acc = ImmutableList.Create(n.Name).AddRange(acc); + continue; + case SelectParent p when !acc.Any(): + continue; + case SelectParent p: + acc = acc.Skip(1).ToImmutableList(); + continue; + case SelectChildPattern pat: + return Tuple.Create, object>(acc.Reverse(), sel.Copy(elements: new[] { pat }.Concat(iter.ToVector()).ToArray())); + default: // compiler ceremony - should never be hit + throw new InvalidOperationException("Unknown ActorSelectionPart []"); + } + } + } + var t = Rec(ImmutableList.Empty); + var concatenatedChildNames = t.Item1; + var m = t.Item2; + + var child = GetChild(concatenatedChildNames); + if (child.IsNobody()) + { + var emptyRef = new EmptyLocalActorRef(_system.Provider, + Path / sel.Elements.Select(el => el.ToString()), _system.EventStream); + emptyRef.Tell(sel, sender); + } + else + { + child.Tell(m, sender); + } + } //Remote ActorSystem on another process / machine has died. //Need to clean up any references to remote deployments here. else if (message is AddressTerminated) @@ -136,9 +183,8 @@ protected override void TellInternal(object message, IActorRef sender) if (@ref.Parent.Path.Address == addressTerminated.Address) _system.Stop(@ref); }); } - else if (message is Identify) + else if (message is Identify identify) { - var identify = message as Identify; sender.Tell(new ActorIdentity(identify.MessageId, this)); } else if (message is TerminationHook) @@ -157,11 +203,9 @@ protected override void TellInternal(object message, IActorRef sender) /// TBD public override void SendSystemMessage(ISystemMessage message) { - if (message is DeathWatchNotification) + if (message is DeathWatchNotification deathWatchNotification) { - var deathWatchNotification = message as DeathWatchNotification; - var child = deathWatchNotification.Actor as ActorRefWithCell; - if (child != null) + if (deathWatchNotification.Actor is ActorRefWithCell child) { if (child.IsLocal) { @@ -183,13 +227,11 @@ public override void SendSystemMessage(ISystemMessage message) else { var parent = deathWatchNotification.Actor; - var parentWithScope = parent as IActorRefScope; - if (parentWithScope != null && !parentWithScope.IsLocal) + if (parent is IActorRefScope parentWithScope && !parentWithScope.IsLocal) { _terminating.Locked(() => { - IImmutableSet children; - if (_parent2Children.TryRemove(parent, out children)) + if (_parent2Children.TryRemove(parent, out var children)) { foreach (var c in children) { diff --git a/src/core/Akka/Actor/ActorSelection.cs b/src/core/Akka/Actor/ActorSelection.cs index 4196bbba1d1..4447749f69c 100644 --- a/src/core/Akka/Actor/ActorSelection.cs +++ b/src/core/Akka/Actor/ActorSelection.cs @@ -296,7 +296,7 @@ public override string ToString() } /// - /// Class ActorSelectionMessage. + /// Used to deliver messages via . /// public class ActorSelectionMessage : IAutoReceivedMessage, IPossiblyHarmful { @@ -324,7 +324,7 @@ public ActorSelectionMessage(object message, SelectionPathElement[] elements, bo public SelectionPathElement[] Elements { get; } /// - /// TBD + /// When true, indicates that this includes wildcards. /// public bool WildCardFanOut { get; } @@ -334,6 +334,19 @@ public override string ToString() var elements = string.Join("/", Elements); return $"ActorSelectionMessage - Message: {Message} - WildCartFanOut: {WildCardFanOut} - Elements: {elements}"; } + + /// + /// Creates a deep copy of the with the provided properties. + /// + /// Optional. The new message to deliver. + /// Optional. The new elements on the actor selection. + /// Optional. Indicates whether or not we're delivering a wildcard . + /// A new . + public ActorSelectionMessage Copy(object message = null, SelectionPathElement[] elements = null, + bool? wildCardFanOut = null) + { + return new ActorSelectionMessage(message ?? Message, elements ?? Elements, wildCardFanOut ?? WildCardFanOut); + } } ///