Skip to content

Commit

Permalink
close akkadotnet#5719 - cleaned up EventStream subscription management
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Mar 14, 2022
1 parent 71ad9a0 commit 96a998b
Showing 1 changed file with 50 additions and 63 deletions.
113 changes: 50 additions & 63 deletions src/core/Akka/Event/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Dispatch;
using Akka.Util;
using Akka.Util.Internal;

Expand All @@ -29,8 +30,13 @@ public class EventStream : LoggingBus
{
private readonly bool _debug;

private readonly AtomicReference<Either<IImmutableSet<IActorRef>, IActorRef>> _initiallySubscribedOrUnsubscriber =
new AtomicReference<Either<IImmutableSet<IActorRef>, IActorRef>>();
// used to uniquely name unsubscribers instances, should there be more than one ActorSystem / EventStream
private static readonly AtomicCounter UnsubscribersCounter = new AtomicCounter(0);
private readonly AtomicReference<IActorRef> _unsubscriber = new AtomicReference<IActorRef>(ActorRefs.NoSender);

// in the event that an actor subscribers to the EventStream prior to ActorSystemImpl.Init is called
// we register them here and then move them all
private readonly ConcurrentSet<IActorRef> _pendingUnsubscribers = new ConcurrentSet<IActorRef>();

/// <summary>
/// Initializes a new instance of the <see cref="EventStream"/> class.
Expand Down Expand Up @@ -108,88 +114,69 @@ public override bool Unsubscribe(IActorRef subscriber)
}

/// <summary>
/// TBD
/// Used to start the Unsubscriber actor, responsible for garabage-collecting
/// all expired subscriptions when the subscribed actor terminates.
/// </summary>
/// <param name="system">TBD</param>
public void StartUnsubscriber(ActorSystemImpl system)
{
EventStreamUnsubscribersProvider.Instance.Start(system, this, _debug);
}

/// <summary>
/// TBD
/// </summary>
/// <param name="unsubscriber">TBD</param>
/// <param name="system">TBD</param>
/// <returns>TBD</returns>
public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system)
{
if (system == null)
{
return false;
}

return _initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
if (_unsubscriber.Value.IsNobody())
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, Either.Right(unsubscriber)))
lock (this)
{
if (_debug)
{
Publish(new Debug(SimpleName(this), GetType(),
string.Format("initialized unsubscriber to: {0} registering {1} initial subscribers with it", unsubscriber, v.Value.Count)));
// not started
var currentValue = _unsubscriber.Value;
var unsubscriber= system.SystemActorOf(Props.Create<EventStreamUnsubscriber>(this, system, _debug).WithDispatcher(Dispatchers.InternalDispatcherId),
$"EventStreamUnsubscriber-{UnsubscribersCounter.IncrementAndGet()}");

if (_unsubscriber.CompareAndSet(currentValue, unsubscriber))
{
// backfill all pending unsubscribers
foreach (var s in _pendingUnsubscribers)
{
unsubscriber.Tell(new EventStreamUnsubscriber.Register(s));
}
_pendingUnsubscribers.Clear();
}
else
{
// somehow, despite being locked, we managed to lose the compare and swap
if (_unsubscriber.Value.IsNobody())
throw new IllegalActorStateException("EventStream is corrupted");
}
v.Value.ForEach(RegisterWithUnsubscriber);
}
else
{
InitUnsubscriber(unsubscriber, system);
}
}
}

}).With<Right<IActorRef>>(presentUnsubscriber =>
{
if (_debug)
{
Publish(new Debug(SimpleName(this), GetType(),
string.Format("not using unsubscriber {0}, because already initialized with {1}", unsubscriber, presentUnsubscriber)));
}
}).WasHandled;
[Obsolete("Should be removed in 1.5")]
public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system)
{
StartUnsubscriber((ActorSystemImpl)system);
return true;
}

private void RegisterWithUnsubscriber(IActorRef subscriber)
{
_initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
if (_unsubscriber.Value.IsNobody())
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v,
Either.Left<IImmutableSet<IActorRef>>(v.Value.Add(subscriber))))
{
RegisterWithUnsubscriber(subscriber);
}
}).With<Right<IActorRef>>(unsubscriber =>
// pending
_pendingUnsubscribers.TryAdd(subscriber);
}
else
{
unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber));
});
_unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber));
}

}

private void UnregisterIfNoMoreSubscribedChannels(IActorRef subscriber)
{
_initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
// not an important operation. If we fail to process this message due to a race condition, then the
// death watch subscription is a no-op anyway.
if (!_unsubscriber.Value.IsNobody())
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v,
Either.Left<IImmutableSet<IActorRef>>(v.Value.Remove(subscriber))))
{
UnregisterIfNoMoreSubscribedChannels(subscriber);
}
}).With<Right<IActorRef>>(unsubscriber =>
{
unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber));
});
_unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber));
}
}
}
}

0 comments on commit 96a998b

Please sign in to comment.