Skip to content

Commit

Permalink
Fix PersistenceQuery ReadJournalFor thread safety issue (#6859)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Jul 31, 2023
1 parent 7ee2dbb commit 0c43702
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/core/Akka.Persistence.Query/PersistenceQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public sealed class PersistenceQuery : IExtension
private readonly ExtendedActorSystem _system;
private readonly ConcurrentDictionary<string, IReadJournal> _readJournalPluginExtensionIds = new();
private ILoggingAdapter _log;
private readonly object _lock = new ();

public static PersistenceQuery Get(ActorSystem system)
{
Expand All @@ -34,8 +35,18 @@ public PersistenceQuery(ExtendedActorSystem system)

public TJournal ReadJournalFor<TJournal>(string readJournalPluginId) where TJournal : IReadJournal
{
var plugin = _readJournalPluginExtensionIds.GetOrAdd(readJournalPluginId, path => CreatePlugin(path, GetDefaultConfig<TJournal>()).GetReadJournal());
return (TJournal)plugin;
if(_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out var plugin))
return (TJournal)plugin;

lock (_lock)
{
if (_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out plugin))
return (TJournal)plugin;

plugin = CreatePlugin(readJournalPluginId, GetDefaultConfig<TJournal>()).GetReadJournal();
_readJournalPluginExtensionIds[readJournalPluginId] = plugin;
return (TJournal)plugin;
}
}

private IReadJournalProvider CreatePlugin(string configPath, Config config)
Expand Down

0 comments on commit 0c43702

Please sign in to comment.