From 96a998baa4b8ea806ffee090519173eff7f5b394 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 14 Mar 2022 15:04:07 -0500 Subject: [PATCH] close #5719 - cleaned up `EventStream` subscription management --- src/core/Akka/Event/EventStream.cs | 113 +++++++++++++---------------- 1 file changed, 50 insertions(+), 63 deletions(-) diff --git a/src/core/Akka/Event/EventStream.cs b/src/core/Akka/Event/EventStream.cs index b7310b5eace..0c35364f12b 100644 --- a/src/core/Akka/Event/EventStream.cs +++ b/src/core/Akka/Event/EventStream.cs @@ -11,6 +11,7 @@ using System.Linq; using Akka.Actor; using Akka.Actor.Internal; +using Akka.Dispatch; using Akka.Util; using Akka.Util.Internal; @@ -29,8 +30,13 @@ public class EventStream : LoggingBus { private readonly bool _debug; - private readonly AtomicReference, IActorRef>> _initiallySubscribedOrUnsubscriber = - new AtomicReference, IActorRef>>(); + // used to uniquely name unsubscribers instances, should there be more than one ActorSystem / EventStream + private static readonly AtomicCounter UnsubscribersCounter = new AtomicCounter(0); + private readonly AtomicReference _unsubscriber = new AtomicReference(ActorRefs.NoSender); + + // in the event that an actor subscribers to the EventStream prior to ActorSystemImpl.Init is called + // we register them here and then move them all + private readonly ConcurrentSet _pendingUnsubscribers = new ConcurrentSet(); /// /// Initializes a new instance of the class. @@ -108,88 +114,69 @@ public override bool Unsubscribe(IActorRef subscriber) } /// - /// TBD + /// Used to start the Unsubscriber actor, responsible for garabage-collecting + /// all expired subscriptions when the subscribed actor terminates. /// /// TBD public void StartUnsubscriber(ActorSystemImpl system) { - EventStreamUnsubscribersProvider.Instance.Start(system, this, _debug); - } - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system) - { - if (system == null) - { - return false; - } - - return _initiallySubscribedOrUnsubscriber.Match().With>>(v => + if (_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, Either.Right(unsubscriber))) + lock (this) { - if (_debug) - { - Publish(new Debug(SimpleName(this), GetType(), - string.Format("initialized unsubscriber to: {0} registering {1} initial subscribers with it", unsubscriber, v.Value.Count))); + // not started + var currentValue = _unsubscriber.Value; + var unsubscriber= system.SystemActorOf(Props.Create(this, system, _debug).WithDispatcher(Dispatchers.InternalDispatcherId), + $"EventStreamUnsubscriber-{UnsubscribersCounter.IncrementAndGet()}"); + if (_unsubscriber.CompareAndSet(currentValue, unsubscriber)) + { + // backfill all pending unsubscribers + foreach (var s in _pendingUnsubscribers) + { + unsubscriber.Tell(new EventStreamUnsubscriber.Register(s)); + } + _pendingUnsubscribers.Clear(); + } + else + { + // somehow, despite being locked, we managed to lose the compare and swap + if (_unsubscriber.Value.IsNobody()) + throw new IllegalActorStateException("EventStream is corrupted"); } - v.Value.ForEach(RegisterWithUnsubscriber); - - - } - else - { - InitUnsubscriber(unsubscriber, system); } + } + } - - }).With>(presentUnsubscriber => - { - if (_debug) - { - Publish(new Debug(SimpleName(this), GetType(), - string.Format("not using unsubscriber {0}, because already initialized with {1}", unsubscriber, presentUnsubscriber))); - - } - }).WasHandled; + [Obsolete("Should be removed in 1.5")] + public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system) + { + StartUnsubscriber((ActorSystemImpl)system); + return true; } private void RegisterWithUnsubscriber(IActorRef subscriber) { - _initiallySubscribedOrUnsubscriber.Match().With>>(v => + if (_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, - Either.Left>(v.Value.Add(subscriber)))) - { - RegisterWithUnsubscriber(subscriber); - } - - }).With>(unsubscriber => + // pending + _pendingUnsubscribers.TryAdd(subscriber); + } + else { - unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber)); - }); + _unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber)); + } + } private void UnregisterIfNoMoreSubscribedChannels(IActorRef subscriber) { - _initiallySubscribedOrUnsubscriber.Match().With>>(v => + // not an important operation. If we fail to process this message due to a race condition, then the + // death watch subscription is a no-op anyway. + if (!_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, - Either.Left>(v.Value.Remove(subscriber)))) - { - UnregisterIfNoMoreSubscribedChannels(subscriber); - } - - }).With>(unsubscriber => - { - unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)); - }); + _unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)); + } } } }