Skip to content

Commit

Permalink
Changed subscriber lists to immutable collections (#3823)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptjhuang authored and Horusiath committed Jun 17, 2019
1 parent 8ed7bbc commit 9f1b7e4
Showing 1 changed file with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ namespace Akka.Persistence.Sql.Common.Journal
/// </summary>
public abstract class SqlJournal : AsyncWriteJournal, IWithUnboundedStash
{
private readonly Dictionary<string, ISet<IActorRef>> _persistenceIdSubscribers = new Dictionary<string, ISet<IActorRef>>();
private readonly Dictionary<string, ISet<IActorRef>> _tagSubscribers = new Dictionary<string, ISet<IActorRef>>();
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _persistenceIdSubscribers = ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _tagSubscribers = ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private readonly HashSet<IActorRef> _allPersistenceIdSubscribers = new HashSet<IActorRef>();
private readonly ReaderWriterLockSlim _allPersistenceIdsLock = new ReaderWriterLockSlim();
private HashSet<string> _allPersistenceIds = new HashSet<string>();
Expand Down Expand Up @@ -327,13 +327,13 @@ public DbConnection CreateDbConnection()
/// <param name="subscriber">TBD</param>
public void RemoveSubscriber(IActorRef subscriber)
{
var pidSubscriptions = _persistenceIdSubscribers.Values.Where(x => x.Contains(subscriber));
foreach (var subscription in pidSubscriptions)
subscription.Remove(subscriber);
_persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));

var tagSubscriptions = _tagSubscribers.Values.Where(x => x.Contains(subscriber));
foreach (var subscription in tagSubscriptions)
subscription.Remove(subscriber);
_tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));

_allPersistenceIdSubscribers.Remove(subscriber);
}
Expand All @@ -347,11 +347,12 @@ public void AddTagSubscriber(IActorRef subscriber, string tag)
{
if (!_tagSubscribers.TryGetValue(tag, out var subscriptions))
{
subscriptions = new HashSet<IActorRef>();
_tagSubscribers.Add(tag, subscriptions);
_tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber));
}
else
{
_tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber));
}

subscriptions.Add(subscriber);
}

/// <summary>
Expand All @@ -373,11 +374,12 @@ public void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceI
{
if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions))
{
subscriptions = new HashSet<IActorRef>();
_persistenceIdSubscribers.Add(persistenceId, subscriptions);
_persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber));
}
else
{
_persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, subscriptions.Add(subscriber));
}

subscriptions.Add(subscriber);
}

private async Task<long> NextTagSequenceNr(string tag)
Expand Down

0 comments on commit 9f1b7e4

Please sign in to comment.