Skip to content

Commit

Permalink
Akka.Persistence: automatically start snapshot store correctly (#2806)
Browse files Browse the repository at this point in the history
* close #2770 - automatically start snapshot store correctly

* added copyright header
  • Loading branch information
Aaronontheweb authored and heynickc committed Jun 27, 2017
1 parent 85168ba commit d02fd07
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<Compile Include="LoadPluginSpec.cs" />
<Compile Include="MemoryEventAdapterSpec.cs" />
<Compile Include="PerformanceSpec.cs" />
<Compile Include="PersistenceConfigAutoStartSpec.cs" />
<Compile Include="PersistenceConfigSpec.cs" />
<Compile Include="PersistenceSpec.cs" />
<Compile Include="PersistentActorBoundedStashingSpec.cs" />
Expand Down
107 changes: 107 additions & 0 deletions src/core/Akka.Persistence.Tests/PersistenceConfigAutoStartSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//-----------------------------------------------------------------------
// <copyright file="PersistenceConfigAutoStartSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Snapshot;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Tests
{
public class PersistenceConfigAutoStartSpec : AkkaSpec
{
#region internal classes

private sealed class TestRequest
{
public static readonly TestRequest Instance = new TestRequest();

private TestRequest()
{
}
}

public class TestJournal : MemoryJournal
{
private readonly string _testValue;

public TestJournal(Config config)
{
_testValue = config.GetString("test-value");
}

protected override bool AroundReceive(Receive receive, object message)
{
if (message is TestRequest)
{
Sender.Tell(_testValue);
return true;
}
else return base.AroundReceive(receive, message);
}
}

public class TestSnapshotStore : LocalSnapshotStore
{
private readonly string _testValue;

public TestSnapshotStore(Config config)
{
_testValue = config.GetString("test-value");
}

protected override bool AroundReceive(Receive receive, object message)
{
if (message is TestRequest)
{
Sender.Tell(_testValue);
return true;
}
else return base.AroundReceive(receive, message);
}
}

#endregion

private static readonly Config AutoStartConfig = ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.persistence.journal {
test {
class = ""Akka.Persistence.Tests.PersistenceConfigAutoStartSpec+TestJournal, Akka.Persistence.Tests""
plugin-dispatcher = ""akka.actor.default-dispatcher""
test-value = ""A""
}
auto-start-journals = [akka.persistence.journal.test]
}
akka.persistence.snapshot-store {
test {
class = ""Akka.Persistence.Tests.PersistenceConfigAutoStartSpec+TestSnapshotStore, Akka.Persistence.Tests""
plugin-dispatcher = ""akka.actor.default-dispatcher""
test-value = ""C""
}
auto-start-snapshot-stores = [akka.persistence.snapshot-store.test]
}
");

public PersistenceConfigAutoStartSpec(ITestOutputHelper output = null) : base(AutoStartConfig, output)
{
}

[Fact]
public void Persistence_should_auto_start_journal_and_snapshotstore_when_specified()
{
EventFilter.Info(message: "Auto-starting journal plugin `akka.persistence.journal.test`")
.And.Info(message: "Auto-starting snapshot store `akka.persistence.snapshot-store.test`").Expect(2, () =>
{
var persistence = Persistence.Instance.Apply(Sys);
});
}
}
}
15 changes: 9 additions & 6 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config)
}

/// <summary>
/// TBD
/// Launches the Akka.Persistence runtime
/// </summary>
public class PersistenceExtension : IExtension
{
Expand All @@ -55,12 +55,15 @@ public class PersistenceExtension : IExtension
private const string SnapshotStoreFallbackConfigPath = "akka.persistence.snapshot-store-plugin-fallback";

/// <summary>
/// TBD
/// Creates a new Akka.Persistence extension.
/// </summary>
/// <param name="system">TBD</param>
/// <exception cref="NullReferenceException">TBD
/// <param name="system">The ActorSystem that will be using Akka.Persistence</param>
/// <exception cref="NullReferenceException">
/// This exception is thrown when the default journal plugin, <c>journal.plugin</c> is not configured.
/// </exception>
/// <remarks>
/// DO NOT CALL DIRECTLY. Will be instantiated automatically be Akka.Persistence actors.
/// </remarks>
public PersistenceExtension(ExtendedActorSystem system)
{
_system = system;
Expand Down Expand Up @@ -106,7 +109,7 @@ public PersistenceExtension(ExtendedActorSystem system)
JournalFor(id);
});

_config.GetStringList("journal.auto-start-snapshot-stores").ForEach(id =>
_config.GetStringList("snapshot-store.auto-start-snapshot-stores").ForEach(id =>
{
if (_log.IsInfoEnabled)
_log.Info("Auto-starting snapshot store `{0}`", id);
Expand All @@ -120,7 +123,7 @@ public PersistenceExtension(ExtendedActorSystem system)
public IStashOverflowStrategy DefaultInternalStashOverflowStrategy => _defaultInternalStashOverflowStrategy.Value;

/// <summary>
/// TBD
/// The Akka.Persistence settings for the journal and snapshot store
/// </summary>
public PersistenceSettings Settings { get; }

Expand Down

0 comments on commit d02fd07

Please sign in to comment.