From 9f1b7e47bc9a4dff55487711009a48d5706179ce Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Mon, 17 Jun 2019 22:24:01 +1000 Subject: [PATCH] Changed subscriber lists to immutable collections (#3823) --- .../Journal/SqlJournal.cs | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs index defc29c92cc..e99558a655a 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs @@ -24,8 +24,8 @@ namespace Akka.Persistence.Sql.Common.Journal /// public abstract class SqlJournal : AsyncWriteJournal, IWithUnboundedStash { - private readonly Dictionary> _persistenceIdSubscribers = new Dictionary>(); - private readonly Dictionary> _tagSubscribers = new Dictionary>(); + private ImmutableDictionary> _persistenceIdSubscribers = ImmutableDictionary.Create>(); + private ImmutableDictionary> _tagSubscribers = ImmutableDictionary.Create>(); private readonly HashSet _allPersistenceIdSubscribers = new HashSet(); private readonly ReaderWriterLockSlim _allPersistenceIdsLock = new ReaderWriterLockSlim(); private HashSet _allPersistenceIds = new HashSet(); @@ -327,13 +327,13 @@ public DbConnection CreateDbConnection() /// TBD public void RemoveSubscriber(IActorRef subscriber) { - var pidSubscriptions = _persistenceIdSubscribers.Values.Where(x => x.Contains(subscriber)); - foreach (var subscription in pidSubscriptions) - subscription.Remove(subscriber); + _persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers + .Where(kv => kv.Value.Contains(subscriber)) + .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); - var tagSubscriptions = _tagSubscribers.Values.Where(x => x.Contains(subscriber)); - foreach (var subscription in tagSubscriptions) - subscription.Remove(subscriber); + _tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers + .Where(kv => kv.Value.Contains(subscriber)) + .Select(kv => new KeyValuePair>(kv.Key, kv.Value.Remove(subscriber)))); _allPersistenceIdSubscribers.Remove(subscriber); } @@ -347,11 +347,12 @@ public void AddTagSubscriber(IActorRef subscriber, string tag) { if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) { - subscriptions = new HashSet(); - _tagSubscribers.Add(tag, subscriptions); + _tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber)); + } + else + { + _tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber)); } - - subscriptions.Add(subscriber); } /// @@ -373,11 +374,12 @@ public void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceI { if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) { - subscriptions = new HashSet(); - _persistenceIdSubscribers.Add(persistenceId, subscriptions); + _persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber)); + } + else + { + _persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, subscriptions.Add(subscriber)); } - - subscriptions.Add(subscriber); } private async Task NextTagSequenceNr(string tag)