Skip to content

Commit

Permalink
Fix atomic updates of AddressTerminatedTopic._subscribers. Also, opti…
Browse files Browse the repository at this point in the history
…mize the implementation to not take copies on Subscribe() or Unsubscribe(). (akkadotnet#4307)
  • Loading branch information
petrikero authored Mar 9, 2020
1 parent 9f2948a commit da53e08
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/core/Akka/Event/AddressTerminatedTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Util;

Expand Down Expand Up @@ -35,7 +37,7 @@ public override AddressTerminatedTopic CreateExtension(ExtendedActorSystem syste
/// </summary>
internal sealed class AddressTerminatedTopic : IExtension
{
private readonly AtomicReference<HashSet<IActorRef>> _subscribers = new AtomicReference<HashSet<IActorRef>>(new HashSet<IActorRef>());
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>();

/// <summary>
/// Retrieves the extension from the specified actor system.
Expand All @@ -53,13 +55,8 @@ public static AddressTerminatedTopic Get(ActorSystem system)
/// <param name="subscriber">The actor that is registering for notifications.</param>
public void Subscribe(IActorRef subscriber)
{
while (true)
{
var current = _subscribers;
if (!_subscribers.CompareAndSet(current, new HashSet<IActorRef>(current.Value) {subscriber}))
continue;
break;
}
lock (_subscribers)
_subscribers.Add(subscriber);
}

/// <summary>
Expand All @@ -68,15 +65,8 @@ public void Subscribe(IActorRef subscriber)
/// <param name="subscriber">The actor that is unregistering for notifications.</param>
public void Unsubscribe(IActorRef subscriber)
{
while (true)
{
var current = _subscribers;
var newSet = new HashSet<IActorRef>(_subscribers.Value);
newSet.Remove(subscriber);
if (!_subscribers.CompareAndSet(current, newSet))
continue;
break;
}
lock (_subscribers)
_subscribers.Remove(subscriber);
}

/// <summary>
Expand All @@ -85,7 +75,11 @@ public void Unsubscribe(IActorRef subscriber)
/// <param name="msg">The message that is sent to all subscribers.</param>
public void Publish(AddressTerminated msg)
{
foreach (var subscriber in _subscribers.Value)
List<IActorRef> subscribers;
lock(_subscribers)
subscribers = _subscribers.ToList();

foreach (var subscriber in subscribers)
{
subscriber.Tell(msg, ActorRefs.NoSender);
}
Expand Down

0 comments on commit da53e08

Please sign in to comment.