diff --git a/src/Akka.sln b/src/Akka.sln
index a825241bba2..ecce82ad78e 100644
--- a/src/Akka.sln
+++ b/src/Akka.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.27004.2010
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.29201.188
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmark", "Benchmark", "{73108242-625A-4D7B-AA09-63375DBAE464}"
EndProject
@@ -197,6 +197,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SpawnBenchmark", "benchmark
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Akka.Persistence.FSharp", "core\Akka.Persistence.FSharp\Akka.Persistence.FSharp.fsproj", "{539C3EB6-FCC8-41FA-9373-364605877EE1}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.TestKit", "core\Akka.Persistence.TestKit\Akka.Persistence.TestKit.csproj", "{212A2D35-E8D1-46A7-A1D1-418CF9509D77}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.TestKit.Tests", "core\Akka.Persistence.TestKit.Tests\Akka.Persistence.TestKit.Tests.csproj", "{22F6EA86-0079-41A0-9BD3-82D2D6C34638}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.TestKit.Xunit2", "core\Akka.Persistence.TestKit.Xunit2\Akka.Persistence.TestKit.Xunit2.csproj", "{6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -844,6 +850,42 @@ Global
{539C3EB6-FCC8-41FA-9373-364605877EE1}.Release|x64.Build.0 = Release|Any CPU
{539C3EB6-FCC8-41FA-9373-364605877EE1}.Release|x86.ActiveCfg = Release|Any CPU
{539C3EB6-FCC8-41FA-9373-364605877EE1}.Release|x86.Build.0 = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|x64.Build.0 = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Debug|x86.Build.0 = Debug|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|Any CPU.Build.0 = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|x64.ActiveCfg = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|x64.Build.0 = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|x86.ActiveCfg = Release|Any CPU
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77}.Release|x86.Build.0 = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|x64.Build.0 = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Debug|x86.Build.0 = Debug|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|Any CPU.Build.0 = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|x64.ActiveCfg = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|x64.Build.0 = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|x86.ActiveCfg = Release|Any CPU
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638}.Release|x86.Build.0 = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|x64.Build.0 = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Debug|x86.Build.0 = Debug|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|x64.ActiveCfg = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|x64.Build.0 = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|x86.ActiveCfg = Release|Any CPU
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -929,6 +971,9 @@ Global
{A1D57384-A933-480A-9DF4-FA5E60AB1A67} = {73108242-625A-4D7B-AA09-63375DBAE464}
{9BEAF609-B406-4CCB-9708-6E8DFF764232} = {73108242-625A-4D7B-AA09-63375DBAE464}
{539C3EB6-FCC8-41FA-9373-364605877EE1} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
+ {212A2D35-E8D1-46A7-A1D1-418CF9509D77} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
+ {22F6EA86-0079-41A0-9BD3-82D2D6C34638} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
+ {6F8FECD6-6E39-473E-9B9A-9EE22CBF479F} = {01167D3C-49C4-4CDE-9787-C176D139ACDD}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164}
diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/PersistActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/PersistActor.cs
new file mode 100644
index 00000000000..5aedebb13bc
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/PersistActor.cs
@@ -0,0 +1,59 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using Actor;
+
+ public class PersistActor : UntypedPersistentActor
+ {
+ public PersistActor(IActorRef probe)
+ {
+ _probe = probe;
+ }
+
+ private readonly IActorRef _probe;
+
+ public override string PersistenceId => "foo";
+
+ protected override void OnCommand(object message)
+ {
+ switch (message as string)
+ {
+ case "write":
+ Persist(message, _ =>
+ {
+ _probe.Tell("ack");
+ });
+
+ break;
+
+ default:
+ return;
+ }
+ }
+
+ protected override void OnRecover(object message)
+ {
+ }
+
+ protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr)
+ {
+ _probe.Tell("failure");
+
+ base.OnPersistFailure(cause, @event, sequenceNr);
+ }
+
+ protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
+ {
+ _probe.Tell("rejected");
+
+ base.OnPersistRejected(cause, @event, sequenceNr);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit.Tests/Actors/SnapshotActor.cs b/src/core/Akka.Persistence.TestKit.Tests/Actors/SnapshotActor.cs
new file mode 100644
index 00000000000..75104f4b39b
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/Actors/SnapshotActor.cs
@@ -0,0 +1,100 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using Actor;
+
+ public class SnapshotActor : UntypedPersistentActor
+ {
+ public SnapshotActor(IActorRef probe)
+ {
+ _probe = probe;
+ }
+
+ private readonly IActorRef _probe;
+
+ public override string PersistenceId => "bar";
+
+ protected override void OnCommand(object message)
+ {
+ switch (message)
+ {
+ case "save":
+ SaveSnapshot(message);
+ return;
+
+ case DeleteOne del:
+ DeleteSnapshot(del.SequenceNr);
+ return;
+
+ case DeleteMany del:
+ DeleteSnapshots(del.Criteria);
+ return;
+
+ case SaveSnapshotSuccess _:
+ case SaveSnapshotFailure _:
+ case DeleteSnapshotSuccess _:
+ case DeleteSnapshotFailure _:
+ case DeleteSnapshotsSuccess _:
+ case DeleteSnapshotsFailure _:
+ _probe.Tell(message);
+ return;
+
+ default:
+ return;
+ }
+ }
+
+ protected override void OnRecover(object message)
+ {
+ if (message is SnapshotOffer snapshot)
+ {
+ _probe.Tell(message);
+ }
+ }
+
+ protected override void OnRecoveryFailure(Exception reason, object message = null)
+ {
+ _probe.Tell(new RecoveryFailure(reason, message));
+ base.OnRecoveryFailure(reason, message);
+ }
+
+ public class DeleteOne
+ {
+ public DeleteOne(long sequenceNr)
+ {
+ SequenceNr = sequenceNr;
+ }
+
+ public long SequenceNr { get; }
+ }
+
+ public class DeleteMany
+ {
+ public DeleteMany(SnapshotSelectionCriteria criteria)
+ {
+ Criteria = criteria;
+ }
+
+ public SnapshotSelectionCriteria Criteria { get; }
+ }
+
+ public class RecoveryFailure
+ {
+ public RecoveryFailure(Exception reason, object message)
+ {
+ Reason = reason;
+ Message = message;
+ }
+
+ public Exception Reason { get; }
+ public object Message { get; }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit.Tests/Akka.Persistence.TestKit.Tests.csproj b/src/core/Akka.Persistence.TestKit.Tests/Akka.Persistence.TestKit.Tests.csproj
new file mode 100644
index 00000000000..1699eb1e18a
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/Akka.Persistence.TestKit.Tests.csproj
@@ -0,0 +1,32 @@
+
+
+
+
+ Akka.Persistence.TestKit.Tests
+ $(NetFrameworkTestVersion);$(NetCoreTestVersion)
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ $(DefineConstants);CORECLR
+
+
+
+ $(DefineConstants);RELEASE
+
+
diff --git a/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs b/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
new file mode 100644
index 00000000000..97ea1a80056
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
@@ -0,0 +1,184 @@
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using Akka.Persistence.TestKit;
+ using FluentAssertions;
+ using Xunit;
+
+ public class JournalInterceptorsSpecs
+ {
+ [Fact]
+ public void noop_immediately_returns_without_exception()
+ {
+ JournalInterceptors.Noop.Instance
+ .Awaiting(x => x.InterceptAsync(null))
+ .ShouldNotThrow();
+ }
+
+ [Fact]
+ public void failure_must_throw_specific_exception()
+ {
+ JournalInterceptors.Failure.Instance
+ .Awaiting(x => x.InterceptAsync(null))
+ .ShouldThrowExactly();
+ }
+
+ [Fact]
+ public void rejection_must_throw_specific_exception()
+ {
+ JournalInterceptors.Rejection.Instance
+ .Awaiting(x => x.InterceptAsync(null))
+ .ShouldThrowExactly();
+ }
+
+ [Fact]
+ public async Task delay_must_call_next_interceptor_after_specified_delay()
+ {
+ var duration = TimeSpan.FromMilliseconds(100);
+ var probe = new InterceptorProbe();
+ var delay = new JournalInterceptors.Delay(duration, probe);
+
+ var startedAt = DateTime.Now;
+ await delay.InterceptAsync(null);
+
+ probe.WasCalled.Should().BeTrue();
+ probe.CalledAt.Should().BeOnOrAfter(startedAt + duration);
+ }
+
+ [Fact]
+ public async Task on_type_must_call_next_interceptor_when_message_is_exactly_awaited_type()
+ {
+ var probe = new InterceptorProbe();
+ var onType = new JournalInterceptors.OnType(typeof(SpecificMessage), probe);
+ var message = new Persistent(new SpecificMessage());
+
+ await onType.InterceptAsync(message);
+
+ probe.WasCalled.Should().BeTrue();
+ probe.Message.Should().BeSameAs(message);
+ }
+
+ [Fact]
+ public async Task on_type_must_call_next_interceptor_when_message_is_subclass_of_awaited_type()
+ {
+ var probe = new InterceptorProbe();
+ var onType = new JournalInterceptors.OnType(typeof(SpecificMessage), probe);
+ var message = new Persistent(new SubclassMessage());
+
+ await onType.InterceptAsync(message);
+
+ probe.WasCalled.Should().BeTrue();
+ probe.Message.Should().BeSameAs(message);
+ }
+
+ [Fact]
+ public async Task on_type_must_call_next_interceptor_when_message_is_implements_awaited_interface_type()
+ {
+ var probe = new InterceptorProbe();
+ var onType = new JournalInterceptors.OnType(typeof(IMessageWithInterface), probe);
+ var message = new Persistent(new MessageWithInterface());
+
+ await onType.InterceptAsync(message);
+
+ probe.WasCalled.Should().BeTrue();
+ probe.Message.Should().BeSameAs(message);
+ }
+
+ [Fact]
+ public async Task on_type_must_not_call_next_interceptor_when_message_does_not_correspond_to_described_rules()
+ {
+ var probe = new InterceptorProbe();
+ var onType = new JournalInterceptors.OnType(typeof(SubclassMessage), probe);
+ var message = new Persistent(new SpecificMessage());
+
+ await onType.InterceptAsync(message);
+
+ probe.WasCalled.Should().BeFalse();
+ }
+
+ [Fact]
+ public async Task on_condition_must_accept_sync_lambda()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new JournalInterceptors.OnCondition(_ => true, probe);
+
+ await onCondition.InterceptAsync(null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task on_condition_must_accept_async_lambda()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new JournalInterceptors.OnCondition(_ => Task.FromResult(true), probe);
+
+ await onCondition.InterceptAsync(null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task on_condition_must_call_next_interceptor_unless_predicate_returns_false()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new JournalInterceptors.OnCondition(_ => false, probe);
+
+ await onCondition.InterceptAsync(null);
+
+ probe.WasCalled.Should().BeFalse();
+ }
+
+ [Fact]
+ public async Task on_condition_with_negation_must_call_next_interceptor_unless_predicate_returns_true()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new JournalInterceptors.OnCondition(_ => false, probe, negate: true);
+
+ await onCondition.InterceptAsync(null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task on_condition_must_pass_the_same_message_to_predicate()
+ {
+ var probe = new InterceptorProbe();
+ var expectedMessage = new Persistent("test");
+
+ var onCondition = new JournalInterceptors.OnCondition(message =>
+ {
+ message.Should().BeSameAs(expectedMessage);
+ return false;
+ }, probe);
+
+ await onCondition.InterceptAsync(expectedMessage);
+ }
+
+
+ private class SpecificMessage { }
+
+ private class SubclassMessage : SpecificMessage { }
+
+ private interface IMessageWithInterface { }
+
+ private class MessageWithInterface : IMessageWithInterface { }
+
+ private class InterceptorProbe : IJournalInterceptor
+ {
+ public bool WasCalled { get; private set; }
+ public DateTime CalledAt { get; private set; }
+ public IPersistentRepresentation Message { get; private set; }
+
+ public Task InterceptAsync(IPersistentRepresentation message)
+ {
+ CalledAt = DateTime.Now;
+ WasCalled = true;
+ Message = message;
+
+ return Task.CompletedTask;
+ }
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs
new file mode 100644
index 00000000000..cefa0377272
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs
@@ -0,0 +1,98 @@
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using FluentAssertions;
+ using Xunit;
+
+ public class SnapshotStoreInterceptorsSpec
+ {
+ [Fact]
+ public void noop_must_do_nothing()
+ => SnapshotStoreInterceptors.Noop.Instance
+ .Awaiting(x => x.InterceptAsync(null, null))
+ .ShouldNotThrow();
+
+ [Fact]
+ public void failure_must_always_throw_exception()
+ => SnapshotStoreInterceptors.Failure.Instance
+ .Awaiting(x => x.InterceptAsync(null, null))
+ .ShouldThrowExactly();
+
+ [Fact]
+ public async Task delay_must_call_next_interceptor_after_specified_delay()
+ {
+ var duration = TimeSpan.FromMilliseconds(100);
+ var probe = new InterceptorProbe();
+ var delay = new SnapshotStoreInterceptors.Delay(duration, probe);
+
+ var startedAt = DateTime.Now;
+ await delay.InterceptAsync(null, null);
+
+ probe.WasCalled.Should().BeTrue();
+ probe.CalledAt.Should().BeOnOrAfter(startedAt + duration);
+ }
+
+ [Fact]
+ public async Task on_condition_must_accept_sync_lambda()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new SnapshotStoreInterceptors.OnCondition((_, __) => true, probe);
+
+ await onCondition.InterceptAsync(null, null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task on_condition_must_accept_async_lambda()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new SnapshotStoreInterceptors.OnCondition((_, __) => Task.FromResult(true), probe);
+
+ await onCondition.InterceptAsync(null, null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task on_condition_must_call_next_interceptor_unless_predicate_returns_false()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new SnapshotStoreInterceptors.OnCondition((_, __) => false, probe);
+
+ await onCondition.InterceptAsync(null, null);
+
+ probe.WasCalled.Should().BeFalse();
+ }
+
+ [Fact]
+ public async Task on_condition_with_negation_must_call_next_interceptor_unless_predicate_returns_true()
+ {
+ var probe = new InterceptorProbe();
+ var onCondition = new SnapshotStoreInterceptors.OnCondition((_, __) => false, probe, negate: true);
+
+ await onCondition.InterceptAsync(null, null);
+
+ probe.WasCalled.Should().BeTrue();
+ }
+
+ public class InterceptorProbe : ISnapshotStoreInterceptor
+ {
+ public bool WasCalled { get; private set; }
+ public DateTime CalledAt { get; private set; }
+ public string PersistenceId { get; private set; }
+ public SnapshotSelectionCriteria Criteria { get; private set; }
+
+ public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ CalledAt = DateTime.Now;
+ WasCalled = true;
+ PersistenceId = persistenceId;
+ Criteria = criteria;
+
+ return Task.CompletedTask;
+ }
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs
new file mode 100644
index 00000000000..4cf30b15692
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs
@@ -0,0 +1,83 @@
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+ using Akka.Persistence.TestKit;
+ using Akka.TestKit;
+ using Xunit;
+
+ public class TestJournalSpec : PersistenceTestKit
+ {
+ public TestJournalSpec()
+ {
+ _probe = CreateTestProbe();
+ }
+
+ private readonly TestProbe _probe;
+
+ [Fact]
+ public void must_return_ack_after_new_write_interceptor_is_set()
+ {
+ JournalActorRef.Tell(new TestJournal.UseWriteInterceptor(null), TestActor);
+
+ ExpectMsg(TimeSpan.FromSeconds(3));
+ }
+
+ [Fact]
+ public async Task works_as_memory_journal_by_default()
+ {
+ var actor = ActorOf(() => new PersistActor(_probe));
+
+ await Journal.OnWrite.Pass();
+ actor.Tell("write", TestActor);
+
+ _probe.ExpectMsg("ack");
+ }
+
+ [Fact]
+ public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail()
+ {
+ var actor = ActorOf(() => new PersistActor(_probe));
+ Watch(actor);
+
+ await Journal.OnWrite.Fail();
+ actor.Tell("write", TestActor);
+
+ _probe.ExpectMsg("failure");
+ ExpectTerminated(actor);
+ }
+
+ [Fact]
+ public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected()
+ {
+ var actor = ActorOf(() => new PersistActor(_probe));
+ Watch(actor);
+
+ await Journal.OnWrite.Reject();
+ actor.Tell("write", TestActor);
+
+ _probe.ExpectMsg("rejected");
+ }
+
+ [Fact]
+ public async Task journal_must_reset_state_to_pass()
+ {
+ await WithJournalWrite(write => write.Fail(), () =>
+ {
+ var actor = ActorOf(() => new PersistActor(_probe));
+ Watch(actor);
+
+ actor.Tell("write", TestActor);
+ _probe.ExpectMsg("failure");
+ ExpectTerminated(actor);
+ });
+
+ var actor2 = ActorOf(() => new PersistActor(_probe));
+ Watch(actor2);
+
+ actor2.Tell("write", TestActor);
+ _probe.ExpectMsg("ack");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit.Tests/TestSnapshotStoreSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/TestSnapshotStoreSpec.cs
new file mode 100644
index 00000000000..bc146165cc6
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Tests/TestSnapshotStoreSpec.cs
@@ -0,0 +1,94 @@
+namespace Akka.Persistence.TestKit.Tests
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+ using Akka.TestKit;
+ using Xunit;
+
+ public sealed class TestSnapshotStoreSpec : PersistenceTestKit
+ {
+ public TestSnapshotStoreSpec()
+ {
+ _probe = CreateTestProbe();
+ }
+
+ private readonly TestProbe _probe;
+
+ [Fact]
+ public void send_ack_after_load_interceptor_is_set()
+ {
+ SnapshotsActorRef.Tell(new TestSnapshotStore.UseLoadInterceptor(null), TestActor);
+ ExpectMsg();
+ }
+
+ [Fact]
+ public void send_ack_after_save_interceptor_is_set()
+ {
+ SnapshotsActorRef.Tell(new TestSnapshotStore.UseSaveInterceptor(null), TestActor);
+ ExpectMsg();
+ }
+
+ [Fact]
+ public void send_ack_after_delete_interceptor_is_set()
+ {
+ SnapshotsActorRef.Tell(new TestSnapshotStore.UseDeleteInterceptor(null), TestActor);
+ ExpectMsg();
+ }
+
+ [Fact]
+ public async Task after_load_behavior_was_executed_store_is_back_to_pass_mode()
+ {
+ // create snapshot
+ var actor = ActorOf(() => new SnapshotActor(_probe));
+ actor.Tell("save");
+ _probe.ExpectMsg();
+ await actor.GracefulStop(TimeSpan.FromSeconds(3));
+
+ await WithSnapshotLoad(load => load.Fail(), () =>
+ {
+ ActorOf(() => new SnapshotActor(_probe));
+ _probe.ExpectMsg();
+ });
+
+ ActorOf(() => new SnapshotActor(_probe));
+ _probe.ExpectMsg();
+ }
+
+ [Fact]
+ public async Task after_save_behavior_was_executed_store_is_back_to_pass_mode()
+ {
+ // create snapshot
+ var actor = ActorOf(() => new SnapshotActor(_probe));
+
+ await WithSnapshotSave(save => save.Fail(), () =>
+ {
+ actor.Tell("save");
+ _probe.ExpectMsg();
+ });
+
+ actor.Tell("save");
+ _probe.ExpectMsg();
+ }
+
+ [Fact]
+ public async Task after_delete_behavior_was_executed_store_is_back_to_pass_mode()
+ {
+ // create snapshot
+ var actor = ActorOf(() => new SnapshotActor(_probe));
+ actor.Tell("save");
+
+ var success = _probe.ExpectMsg();
+ var nr = success.Metadata.SequenceNr;
+
+ await WithSnapshotDelete(del => del.Fail(), () =>
+ {
+ actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor);
+ _probe.ExpectMsg();
+ });
+
+ actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor);
+ _probe.ExpectMsg();
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/Akka.Persistence.TestKit.Xunit2.csproj b/src/core/Akka.Persistence.TestKit.Xunit2/Akka.Persistence.TestKit.Xunit2.csproj
new file mode 100644
index 00000000000..c4567057483
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Xunit2/Akka.Persistence.TestKit.Xunit2.csproj
@@ -0,0 +1,26 @@
+
+
+
+
+ Akka.Persistence.TestKit.Xunit2
+ TestKit for writing tests for Akka.NET Persistance module using xUnit
+ $(NetFrameworkLibVersion);$(NetStandardLibVersion)
+ $(AkkaPackageTags);testkit;persistance;xunit
+ true
+
+
+
+
+
+
+
+
+
+ $(DefineConstants);RELEASE
+
+
+
+
+ true
+
+
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
new file mode 100644
index 00000000000..9d5bb92eb84
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
@@ -0,0 +1,284 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+ using Akka.TestKit.Xunit2;
+ using Configuration;
+ using Xunit.Abstractions;
+
+ ///
+ /// This class represents an Akka.NET Persistence TestKit that uses xUnit
+ /// as its testing framework.
+ ///
+ public abstract class PersistenceTestKit : TestKit
+ {
+ ///
+ /// Create a new instance of the class.
+ /// A new system with the specified configuration will be created.
+ ///
+ /// Optional: The name of the actor system
+ protected PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
+ : base(GetConfig(), actorSystemName, output)
+ {
+ var persistenceExtension = Persistence.Instance.Apply(Sys);
+
+ JournalActorRef = persistenceExtension.JournalFor(null);
+ Journal = TestJournal.FromRef(JournalActorRef);
+
+ SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null);
+ Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
+ }
+
+ ///
+ /// Actor reference to persistence Journal used by current actor system.
+ ///
+ public IActorRef JournalActorRef { get; }
+
+ ///
+ /// Actor reference to persistence Snapshot Store used by current actor system.
+ ///
+ public IActorRef SnapshotsActorRef { get; }
+
+ ///
+ ///
+ ///
+ public ITestJournal Journal { get; }
+
+ ///
+ ///
+ ///
+ public ITestSnapshotStore Snapshots { get; }
+
+ ///
+ /// Execute delegate with Journal Behavior applied to Recovery operation.
+ ///
+ ///
+ /// After will be executed, Recovery behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Journal behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public async Task WithJournalRecovery(Func behaviorSelector, Func execution)
+ {
+ if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector));
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ try
+ {
+ await behaviorSelector(Journal.OnRecovery);
+ await execution();
+ }
+ finally
+ {
+ await Journal.OnRecovery.Pass();
+ }
+ }
+
+ ///
+ /// Execute delegate with Journal Behavior applied to Write operation.
+ ///
+ ///
+ /// After will be executed, Write behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Journal behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public async Task WithJournalWrite(Func behaviorSelector, Func execution)
+ {
+ if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector));
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ try
+ {
+ await behaviorSelector(Journal.OnWrite);
+ await execution();
+ }
+ finally
+ {
+ await Journal.OnWrite.Pass();
+ }
+ }
+
+ ///
+ /// Execute delegate with Journal Behavior applied to Recovery operation.
+ ///
+ ///
+ /// After will be executed, Recovery behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Journal behavior.
+ /// Delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public Task WithJournalRecovery(Func behaviorSelector, Action execution)
+ => WithJournalRecovery(behaviorSelector, () =>
+ {
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ execution();
+ return Task.FromResult(new object());
+ });
+
+ ///
+ /// Execute delegate with Journal Behavior applied to Write operation.
+ ///
+ ///
+ /// After will be executed, Write behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Journal behavior.
+ /// Delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public Task WithJournalWrite(Func behaviorSelector, Action execution)
+ => WithJournalWrite(behaviorSelector, () =>
+ {
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ execution();
+ return Task.FromResult(new object());
+ });
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Save operation.
+ ///
+ ///
+ /// After will be executed, Save behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public async Task WithSnapshotSave(Func behaviorSelector, Func execution)
+ {
+ if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector));
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ try
+ {
+ await behaviorSelector(Snapshots.OnSave);
+ await execution();
+ }
+ finally
+ {
+ await Snapshots.OnSave.Pass();
+ }
+ }
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Load operation.
+ ///
+ ///
+ /// After will be executed, Load behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public async Task WithSnapshotLoad(Func behaviorSelector, Func execution)
+ {
+ if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector));
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ try
+ {
+ await behaviorSelector(Snapshots.OnLoad);
+ await execution();
+ }
+ finally
+ {
+ await Snapshots.OnLoad.Pass();
+ }
+ }
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Delete operation.
+ ///
+ ///
+ /// After will be executed, Delete behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public async Task WithSnapshotDelete(Func behaviorSelector, Func execution)
+ {
+ if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector));
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ try
+ {
+ await behaviorSelector(Snapshots.OnDelete);
+ await execution();
+ }
+ finally
+ {
+ await Snapshots.OnDelete.Pass();
+ }
+ }
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Save operation.
+ ///
+ ///
+ /// After will be executed, Save behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public Task WithSnapshotSave(Func behaviorSelector, Action execution)
+ => WithSnapshotSave(behaviorSelector, () =>
+ {
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ execution();
+ return Task.FromResult(true);
+ });
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Load operation.
+ ///
+ ///
+ /// After will be executed, Load behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public Task WithSnapshotLoad(Func behaviorSelector, Action execution)
+ => WithSnapshotLoad(behaviorSelector, () =>
+ {
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ execution();
+ return Task.FromResult(true);
+ });
+
+ ///
+ /// Execute delegate with Snapshot Store Behavior applied to Delete operation.
+ ///
+ ///
+ /// After will be executed, Delete behavior will be reverted back to normal.
+ ///
+ /// Delegate which will select Snapshot Store behavior.
+ /// Async delegate which will be executed with applied Journal behavior.
+ /// which must be awaited.
+ public Task WithSnapshotDelete(Func behaviorSelector, Action execution)
+ => WithSnapshotDelete(behaviorSelector, () =>
+ {
+ if (execution == null) throw new ArgumentNullException(nameof(execution));
+
+ execution();
+ return Task.FromResult(true);
+ });
+
+ ///
+ /// Loads from embedded resources actor system persistence configuration with and
+ /// configured as default persistence plugins.
+ ///
+ /// Actor system configuration object.
+ ///
+ static Config GetConfig()
+ => ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf");
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj
new file mode 100644
index 00000000000..18a087e7133
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj
@@ -0,0 +1,30 @@
+
+
+
+
+ Akka.Persistence.TestKit
+ TestKit for writing tests for Akka.NET Persistance module.
+ $(NetFrameworkLibVersion);$(NetStandardLibVersion)
+ $(AkkaPackageTags);testkit;persistance
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+ $(DefineConstants);RELEASE
+
+
+
+
+ true
+
+
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/IJournalBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/IJournalBehaviorSetter.cs
new file mode 100644
index 00000000000..49cd75668bb
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/IJournalBehaviorSetter.cs
@@ -0,0 +1,16 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System.Threading.Tasks;
+
+ public interface IJournalBehaviorSetter
+ {
+ Task SetInterceptorAsync(IJournalInterceptor interceptor);
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/IJournalInterceptor.cs b/src/core/Akka.Persistence.TestKit/Journal/IJournalInterceptor.cs
new file mode 100644
index 00000000000..14a5a8ea3cd
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/IJournalInterceptor.cs
@@ -0,0 +1,16 @@
+namespace Akka.Persistence.TestKit
+{
+ using System.Threading.Tasks;
+
+ ///
+ /// Interface to object which will intercept written and recovered messages in .
+ ///
+ public interface IJournalInterceptor
+ {
+ ///
+ /// Method will be called for each individual message before it is written or recovered.
+ ///
+ /// Written or recovered message.
+ Task InterceptAsync(IPersistentRepresentation message);
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
new file mode 100644
index 00000000000..c06b37b9c35
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
@@ -0,0 +1,25 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ ///
+ /// proxy object interface. Used to simplify communication with actor instance.
+ ///
+ public interface ITestJournal
+ {
+ ///
+ /// List of interceptors to alter write behavior of proxied journal.
+ ///
+ JournalWriteBehavior OnWrite { get; }
+
+ ///
+ /// List of interceptors to alter recovery behavior of proxied journal.
+ ///
+ JournalRecoveryBehavior OnRecovery { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs
new file mode 100644
index 00000000000..ebcc279335c
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs
@@ -0,0 +1,99 @@
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+
+ internal static class JournalInterceptors
+ {
+ internal class Noop : IJournalInterceptor
+ {
+ public static readonly IJournalInterceptor Instance = new Noop();
+
+ public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
+ }
+
+ internal class Failure : IJournalInterceptor
+ {
+ public static readonly IJournalInterceptor Instance = new Failure();
+
+ public Task InterceptAsync(IPersistentRepresentation message) => throw new TestJournalFailureException();
+ }
+
+ internal class Rejection : IJournalInterceptor
+ {
+ public static readonly IJournalInterceptor Instance = new Rejection();
+
+ public Task InterceptAsync(IPersistentRepresentation message) => throw new TestJournalRejectionException();
+ }
+
+ internal class Delay : IJournalInterceptor
+ {
+ public Delay(TimeSpan delay, IJournalInterceptor next)
+ {
+ _delay = delay;
+ _next = next;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly IJournalInterceptor _next;
+
+ public async Task InterceptAsync(IPersistentRepresentation message)
+ {
+ await Task.Delay(_delay);
+ await _next.InterceptAsync(message);
+ }
+ }
+
+ internal sealed class OnCondition : IJournalInterceptor
+ {
+ public OnCondition(Func> predicate, IJournalInterceptor next, bool negate = false)
+ {
+ _predicate = predicate;
+ _next = next;
+ _negate = negate;
+ }
+
+ public OnCondition(Func predicate, IJournalInterceptor next, bool negate = false)
+ {
+ _predicate = message => Task.FromResult(predicate(message));
+ _next = next;
+ _negate = negate;
+ }
+
+ private readonly Func> _predicate;
+ private readonly IJournalInterceptor _next;
+ private readonly bool _negate;
+
+ public async Task InterceptAsync(IPersistentRepresentation message)
+ {
+ var result = await _predicate(message);
+ if ((_negate && !result) || (!_negate && result))
+ {
+ await _next.InterceptAsync(message);
+ }
+ }
+ }
+
+ internal class OnType : IJournalInterceptor
+ {
+ public OnType(Type messageType, IJournalInterceptor next)
+ {
+ _messageType = messageType;
+ _next = next;
+ }
+
+ private readonly Type _messageType;
+ private readonly IJournalInterceptor _next;
+
+ public async Task InterceptAsync(IPersistentRepresentation message)
+ {
+ var type = message.Payload.GetType();
+
+ if (_messageType.IsAssignableFrom(type))
+ {
+ await _next.InterceptAsync(message);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehavior.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehavior.cs
new file mode 100644
index 00000000000..701bdad5d3b
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehavior.cs
@@ -0,0 +1,419 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+
+ ///
+ /// Built-in Journal interceptors who will alter messages Recovery and/or Write of .
+ ///
+ public class JournalRecoveryBehavior
+ {
+ internal JournalRecoveryBehavior(IJournalBehaviorSetter setter)
+ {
+ this.Setter = setter;
+ }
+
+ private IJournalBehaviorSetter Setter { get; }
+
+ ///
+ /// Use custom, user defined interceptor.
+ ///
+ /// User defined interceptor which implements interface.
+ /// When is null.
+ public Task SetInterceptorAsync(IJournalInterceptor interceptor)
+ {
+ if (interceptor == null) throw new ArgumentNullException(nameof(interceptor));
+
+ return Setter.SetInterceptorAsync(interceptor);
+ }
+
+ ///
+ /// Pass all messages to journal without interfering.
+ ///
+ ///
+ /// By using this interceptor all journal operations will work like
+ /// in standard .
+ ///
+ public Task Pass() => SetInterceptorAsync(JournalInterceptors.Noop.Instance);
+
+ ///
+ /// Delay passing all messages to journal by .
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ /// Time by which recovery operation will be delayed.
+ /// When is less or equal to .
+ public Task PassWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Noop.Instance));
+ }
+
+ ///
+ /// Always fail all messages.
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ public Task Fail() => SetInterceptorAsync(JournalInterceptors.Failure.Instance);
+
+ ///
+ /// Fail message during processing message of type .
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will fail when message implements that interface.
+ /// - If is class, journal will fail when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ public Task FailOnType() => FailOnType(typeof(TMessage));
+
+ ///
+ /// Fail message during processing message of type .
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will fail when message implements that interface.
+ /// - If is class, journal will fail when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailOnType(Type messageType)
+ {
+ if (messageType == null) throw new ArgumentNullException(nameof(messageType));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnType(messageType, JournalInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message if predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message if async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message unless predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message unless async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message after specified delay.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task FailWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message after specified delay if async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay if predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay if recovering message of type .
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will fail when message implements that interface.
+ /// - If is class, journal will fail when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task FailOnTypeWithDelay(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage));
+
+ ///
+ /// Fail message after specified delay if recovering message of type .
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will fail when message implements that interface.
+ /// - If is class, journal will fail when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailOnTypeWithDelay(TimeSpan delay, Type messageType)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (messageType == null) throw new ArgumentNullException(nameof(messageType));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnType(
+ messageType,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)
+ ));
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs
new file mode 100644
index 00000000000..4822b995148
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalRecoveryBehaviorSetter.cs
@@ -0,0 +1,32 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+
+ ///
+ /// Setter strategy for TestJournal which will set recovery interceptor.
+ ///
+ internal class JournalRecoveryBehaviorSetter : IJournalBehaviorSetter
+ {
+ internal JournalRecoveryBehaviorSetter(IActorRef journal)
+ {
+ this._journal = journal;
+ }
+
+ private readonly IActorRef _journal;
+
+ public Task SetInterceptorAsync(IJournalInterceptor interceptor)
+ => _journal.Ask(
+ new TestJournal.UseRecoveryInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehavior.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehavior.cs
new file mode 100644
index 00000000000..53e367f7195
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehavior.cs
@@ -0,0 +1,392 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+
+ ///
+ /// Built-in Journal interceptors who will alter message Write of .
+ ///
+ public class JournalWriteBehavior : JournalRecoveryBehavior
+ {
+ internal JournalWriteBehavior(IJournalBehaviorSetter setter) : base(setter) { }
+
+ ///
+ /// Always reject all messages.
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ public Task Reject() => SetInterceptorAsync(JournalInterceptors.Rejection.Instance);
+
+ ///
+ /// Reject message during processing message of type .
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will reject when message implements that interface.
+ /// - If is class, journal will reject when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ public Task RejectOnType() => FailOnType(typeof(TMessage));
+
+ ///
+ /// Reject message during processing message of type .
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will reject when message implements that interface.
+ /// - If is class, journal will reject when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task RejectOnType(Type messageType)
+ {
+ if (messageType == null) throw new ArgumentNullException(nameof(messageType));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnType(messageType, JournalInterceptors.Rejection.Instance));
+ }
+
+ ///
+ /// Reject message if predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task RejectIf(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance));
+ }
+
+ ///
+ /// Reject message if async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task RejectIf(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance));
+ }
+
+ ///
+ /// Reject message unless predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task RejectUnless(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true));
+ }
+
+ ///
+ /// Reject message unless async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task RejectUnless(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true));
+ }
+
+ ///
+ /// Reject message after specified delay.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task RejectWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance));
+ }
+
+ ///
+ /// Reject message after specified delay if async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task RejectIfWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance)
+ ));
+ }
+
+ ///
+ /// Reject message after specified delay if predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task RejectIfWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance)
+ ));
+ }
+
+ ///
+ /// Reject message after specified delay unless predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task RejectUnlessWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Reject message after specified delay unless async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task RejectUnlessWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnCondition(
+ predicate,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Reject message after specified delay if recovering message of type .
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will reject when message implements that interface.
+ /// - If is class, journal will reject when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task RejectOnTypeWithDelay(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage));
+
+ ///
+ /// Reject message after specified delay if recovering message of type .
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will **NOT** crash, but will be called
+ /// on each rejected message.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle serialization,
+ /// type conversion and other local problems withing journal.
+ ///
+ ///
+ ///
+ /// - If is interface, journal will reject when message implements that interface.
+ /// - If is class, journal will reject when message can be assigned to .
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task RejectOnTypeWithDelay(TimeSpan delay, Type messageType)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (messageType == null) throw new ArgumentNullException(nameof(messageType));
+
+ return SetInterceptorAsync(new JournalInterceptors.OnType(
+ messageType,
+ new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance)
+ ));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs
new file mode 100644
index 00000000000..98b25a4cebe
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalWriteBehaviorSetter.cs
@@ -0,0 +1,32 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+
+ ///
+ /// Setter strategy for which will set write interceptor.
+ ///
+ internal class JournalWriteBehaviorSetter : IJournalBehaviorSetter
+ {
+ internal JournalWriteBehaviorSetter(IActorRef journal)
+ {
+ this._journal = journal;
+ }
+
+ private readonly IActorRef _journal;
+
+ public Task SetInterceptorAsync(IJournalInterceptor interceptor)
+ => _journal.Ask(
+ new TestJournal.UseWriteInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs
new file mode 100644
index 00000000000..8396d429a84
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs
@@ -0,0 +1,149 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using Akka.Actor;
+ using Akka.Persistence;
+ using Akka.Persistence.Journal;
+ using System;
+ using System.Collections.Generic;
+ using System.Collections.Immutable;
+ using System.Threading.Tasks;
+
+ ///
+ /// In-memory persistence journal implementation which behavior could be controlled by interceptors.
+ ///
+ public sealed class TestJournal : MemoryJournal
+ {
+ private IJournalInterceptor _writeInterceptor = JournalInterceptors.Noop.Instance;
+ private IJournalInterceptor _recoveryInterceptor = JournalInterceptors.Noop.Instance;
+
+ protected override bool ReceivePluginInternal(object message)
+ {
+ switch (message)
+ {
+ case UseWriteInterceptor use:
+ _writeInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
+ case UseRecoveryInterceptor use:
+ _recoveryInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
+ default:
+ return base.ReceivePluginInternal(message);
+ }
+ }
+
+ protected override async Task> WriteMessagesAsync(IEnumerable messages)
+ {
+ var exceptions = new List();
+ foreach (var w in messages)
+ {
+ foreach (var p in (IEnumerable) w.Payload)
+ {
+ try
+ {
+ await _writeInterceptor.InterceptAsync(p);
+ Add(p);
+ exceptions.Add(null);
+ }
+ catch (TestJournalRejectionException rejected)
+ {
+ // i.e. problems with data: corrupted data-set, problems in serialization, constraints, etc.
+ exceptions.Add(rejected);
+ }
+ catch (TestJournalFailureException)
+ {
+ // i.e. data-store problems: network, invalid credentials, etc.
+ throw;
+ }
+ }
+ }
+
+ return exceptions.ToImmutableList();
+ }
+
+ public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback)
+ {
+ var highest = HighestSequenceNr(persistenceId);
+ if (highest != 0L && max != 0L)
+ {
+ var messages = Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max);
+ foreach (var p in messages)
+ {
+ try
+ {
+ await _recoveryInterceptor.InterceptAsync(p);
+ recoveryCallback(p);
+ }
+ catch (TestJournalFailureException)
+ {
+ // i.e. problems with data: corrupted data-set, problems in serialization
+ // i.e. data-store problems: network, invalid credentials, etc.
+ throw;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Create proxy object from journal actor reference which can alter behavior of journal.
+ ///
+ ///
+ /// Journal actor must be of type.
+ ///
+ /// Journal actor reference.
+ /// Proxy object to control .
+ public static ITestJournal FromRef(IActorRef actor)
+ {
+ return new TestJournalWrapper(actor);
+ }
+
+ public sealed class UseWriteInterceptor
+ {
+ public UseWriteInterceptor(IJournalInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public IJournalInterceptor Interceptor { get; }
+ }
+
+ public sealed class UseRecoveryInterceptor
+ {
+ public UseRecoveryInterceptor(IJournalInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public IJournalInterceptor Interceptor { get; }
+ }
+
+ public sealed class Ack
+ {
+ public static readonly Ack Instance = new Ack();
+ }
+
+ internal class TestJournalWrapper : ITestJournal
+ {
+ public TestJournalWrapper(IActorRef actor)
+ {
+ _actor = actor;
+ }
+
+ private readonly IActorRef _actor;
+
+ public JournalWriteBehavior OnWrite => new JournalWriteBehavior(new JournalWriteBehaviorSetter(_actor));
+
+ public JournalRecoveryBehavior OnRecovery => new JournalRecoveryBehavior(new JournalRecoveryBehaviorSetter(_actor));
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournalFailureException.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournalFailureException.cs
new file mode 100644
index 00000000000..b95005060b8
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournalFailureException.cs
@@ -0,0 +1,21 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ [Serializable]
+ public class TestJournalFailureException : Exception
+ {
+ public TestJournalFailureException() { }
+ public TestJournalFailureException(string message) : base(message) { }
+ public TestJournalFailureException(string message, Exception inner) : base(message, inner) { }
+ protected TestJournalFailureException(SerializationInfo info, StreamingContext context) : base(info, context) { }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournalRejectionException.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournalRejectionException.cs
new file mode 100644
index 00000000000..3e1c8d01290
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournalRejectionException.cs
@@ -0,0 +1,21 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ [Serializable]
+ public class TestJournalRejectionException : Exception
+ {
+ public TestJournalRejectionException() { }
+ public TestJournalRejectionException(string message) : base(message) { }
+ public TestJournalRejectionException(string message, Exception inner) : base(message, inner) { }
+ protected TestJournalRejectionException(SerializationInfo info, StreamingContext context) : base(info, context) { }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs b/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs
new file mode 100644
index 00000000000..aed9a510eed
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs
@@ -0,0 +1,24 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2018 Lightbend Inc.
+// Copyright (C) 2013-2018 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("66023c4f-f246-446d-b212-2b8f20755671")]
+
+[assembly: InternalsVisibleTo("Akka.Persistence.TestKit.Tests")]
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreBehaviorSetter.cs
new file mode 100644
index 00000000000..e0d475217fb
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreBehaviorSetter.cs
@@ -0,0 +1,16 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System.Threading.Tasks;
+
+ public interface ISnapshotStoreBehaviorSetter
+ {
+ Task SetInterceptorAsync(ISnapshotStoreInterceptor interceptor);
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreInterceptor.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreInterceptor.cs
new file mode 100644
index 00000000000..4a2054c008c
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreInterceptor.cs
@@ -0,0 +1,22 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System.Threading.Tasks;
+
+ ///
+ /// Interface to object which will intercept all action in .
+ ///
+ public interface ISnapshotStoreInterceptor
+ {
+ ///
+ /// Method will be called for each load, save or delete attempt in .
+ ///
+ Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria);
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs
new file mode 100644
index 00000000000..13b5d67e267
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs
@@ -0,0 +1,16 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ public interface ITestSnapshotStore
+ {
+ SnapshotStoreSaveBehavior OnSave { get; }
+ SnapshotStoreLoadBehavior OnLoad { get; }
+ SnapshotStoreDeleteBehavior OnDelete { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehavior.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehavior.cs
new file mode 100644
index 00000000000..17c72909102
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehavior.cs
@@ -0,0 +1,17 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ public class SnapshotStoreDeleteBehavior : SnapshotStoreSaveBehavior
+ {
+ public SnapshotStoreDeleteBehavior(ISnapshotStoreBehaviorSetter setter) : base(setter)
+ {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehaviorSetter.cs
new file mode 100644
index 00000000000..9764f121a12
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreDeleteBehaviorSetter.cs
@@ -0,0 +1,32 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+
+ ///
+ /// Setter strategy for which will set delete interceptor.
+ ///
+ internal class SnapshotStoreDeleteBehaviorSetter : ISnapshotStoreBehaviorSetter
+ {
+ internal SnapshotStoreDeleteBehaviorSetter(IActorRef snapshots)
+ {
+ this._snapshots = snapshots;
+ }
+
+ private readonly IActorRef _snapshots;
+
+ public Task SetInterceptorAsync(ISnapshotStoreInterceptor interceptor)
+ => _snapshots.Ask(
+ new TestSnapshotStore.UseDeleteInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs
new file mode 100644
index 00000000000..f9fcfafe71b
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs
@@ -0,0 +1,77 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+
+ internal static class SnapshotStoreInterceptors
+ {
+ internal class Noop : ISnapshotStoreInterceptor
+ {
+ public static readonly ISnapshotStoreInterceptor Instance = new Noop();
+
+ public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => Task.FromResult(true);
+ }
+
+ internal class Failure : ISnapshotStoreInterceptor
+ {
+ public static readonly ISnapshotStoreInterceptor Instance = new Failure();
+
+ public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => throw new TestSnapshotStoreFailureException();
+ }
+
+ internal class Delay : ISnapshotStoreInterceptor
+ {
+ public Delay(TimeSpan delay, ISnapshotStoreInterceptor next)
+ {
+ _delay = delay;
+ _next = next;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly ISnapshotStoreInterceptor _next;
+
+ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ await Task.Delay(_delay);
+ await _next.InterceptAsync(persistenceId, criteria);
+ }
+ }
+
+ internal sealed class OnCondition : ISnapshotStoreInterceptor
+ {
+ public OnCondition(Func> predicate, ISnapshotStoreInterceptor next, bool negate = false)
+ {
+ _predicate = predicate;
+ _next = next;
+ _negate = negate;
+ }
+
+ public OnCondition(Func predicate, ISnapshotStoreInterceptor next, bool negate = false)
+ {
+ _predicate = (persistenceId, criteria) => Task.FromResult(predicate(persistenceId, criteria));
+ _next = next;
+ _negate = negate;
+ }
+
+ private readonly Func> _predicate;
+ private readonly ISnapshotStoreInterceptor _next;
+ private readonly bool _negate;
+
+ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ var result = await _predicate(persistenceId, criteria);
+ if ((_negate && !result) || (!_negate && result))
+ {
+ await _next.InterceptAsync(persistenceId, criteria);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehavior.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehavior.cs
new file mode 100644
index 00000000000..6c363ba7dd1
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehavior.cs
@@ -0,0 +1,17 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ public class SnapshotStoreLoadBehavior : SnapshotStoreSaveBehavior
+ {
+ public SnapshotStoreLoadBehavior(ISnapshotStoreBehaviorSetter setter) : base(setter)
+ {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehaviorSetter.cs
new file mode 100644
index 00000000000..b0e34e3edf7
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreLoadBehaviorSetter.cs
@@ -0,0 +1,32 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+
+ ///
+ /// Setter strategy for which will set load interceptor.
+ ///
+ internal class SnapshotStoreLoadBehaviorSetter : ISnapshotStoreBehaviorSetter
+ {
+ internal SnapshotStoreLoadBehaviorSetter(IActorRef snapshots)
+ {
+ this._snapshots = snapshots;
+ }
+
+ private readonly IActorRef _snapshots;
+
+ public Task SetInterceptorAsync(ISnapshotStoreInterceptor interceptor)
+ => _snapshots.Ask(
+ new TestSnapshotStore.UseLoadInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehavior.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehavior.cs
new file mode 100644
index 00000000000..da25961a89d
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehavior.cs
@@ -0,0 +1,302 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+
+ using InterceptorPredicate = System.Func;
+ using AsyncInterceptorPredicate = System.Func>;
+
+ ///
+ /// Built-in snapshot store interceptors who will alter message Write of .
+ ///
+ public class SnapshotStoreSaveBehavior
+ {
+ public SnapshotStoreSaveBehavior(ISnapshotStoreBehaviorSetter setter)
+ {
+ Setter = setter;
+ }
+
+ protected readonly ISnapshotStoreBehaviorSetter Setter;
+
+ ///
+ /// Use custom, user defined interceptor.
+ ///
+ /// User defined interceptor which implements interface.
+ /// When is null.
+ public Task SetInterceptorAsync(ISnapshotStoreInterceptor interceptor)
+ {
+ if (interceptor == null) throw new ArgumentNullException(nameof(interceptor));
+
+ return Setter.SetInterceptorAsync(interceptor);
+ }
+
+ ///
+ /// Pass all messages to snapshot store without interfering.
+ ///
+ ///
+ /// By using this interceptor all snapshot store operations will work like
+ /// in standard .
+ ///
+ public Task Pass() => SetInterceptorAsync(SnapshotStoreInterceptors.Noop.Instance);
+
+ ///
+ /// Delay passing all messages to snapshot store by .
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ /// Time by which recovery operation will be delayed.
+ /// When is less or equal to .
+ public Task PassWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Noop.Instance));
+ }
+
+ ///
+ /// Always fail all messages.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying snapshot store.
+ ///
+ ///
+ public Task Fail() => SetInterceptorAsync(SnapshotStoreInterceptors.Failure.Instance);
+
+ ///
+ /// Fail message if predicate will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying snapshot store.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(InterceptorPredicate predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(predicate, SnapshotStoreInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message if async predicate will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying snapshot store.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(AsyncInterceptorPredicate predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(predicate, SnapshotStoreInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message unless predicate will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying snapshot store.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(InterceptorPredicate predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(predicate, SnapshotStoreInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message unless async predicate will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying snapshot store.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(AsyncInterceptorPredicate predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(predicate, SnapshotStoreInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message after specified delay.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task FailWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message after specified delay if async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, AsyncInterceptorPredicate predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(
+ predicate,
+ new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay if predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, InterceptorPredicate predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(
+ predicate,
+ new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, InterceptorPredicate predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(
+ predicate,
+ new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Snapshot store will crash and actor will receive one of ,
+ /// or messages.
+ ///
+ ///
+ /// Use this snapshot store behavior when it is needed to verify how well a persistent actor will handle network problems
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, AsyncInterceptorPredicate predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new SnapshotStoreInterceptors.OnCondition(
+ predicate,
+ new SnapshotStoreInterceptors.Delay(delay, SnapshotStoreInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs
new file mode 100644
index 00000000000..b2ad4581177
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreSaveBehaviorSetter.cs
@@ -0,0 +1,32 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Threading.Tasks;
+ using Actor;
+
+ ///
+ /// Setter strategy for which will set save interceptor.
+ ///
+ internal class SnapshotStoreSaveBehaviorSetter : ISnapshotStoreBehaviorSetter
+ {
+ internal SnapshotStoreSaveBehaviorSetter(IActorRef snapshots)
+ {
+ this._snapshots = snapshots;
+ }
+
+ private readonly IActorRef _snapshots;
+
+ public Task SetInterceptorAsync(ISnapshotStoreInterceptor interceptor)
+ => _snapshots.Ask(
+ new TestSnapshotStore.UseSaveInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs
new file mode 100644
index 00000000000..fe863c69bdb
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs
@@ -0,0 +1,130 @@
+namespace Akka.Persistence.TestKit
+{
+ using System.Threading.Tasks;
+ using Actor;
+ using Snapshot;
+
+ ///
+ /// In-memory persistence snapshot store implementation which behavior could be controlled by interceptors.
+ ///
+ public class TestSnapshotStore : MemorySnapshotStore
+ {
+ private ISnapshotStoreInterceptor _saveInterceptor = SnapshotStoreInterceptors.Noop.Instance;
+ private ISnapshotStoreInterceptor _loadInterceptor = SnapshotStoreInterceptors.Noop.Instance;
+ private ISnapshotStoreInterceptor _deleteInterceptor = SnapshotStoreInterceptors.Noop.Instance;
+
+ protected override bool ReceivePluginInternal(object message)
+ {
+ switch (message)
+ {
+ case UseSaveInterceptor use:
+ _saveInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
+ case UseLoadInterceptor use:
+ _loadInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
+ case UseDeleteInterceptor use:
+ _deleteInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
+ default:
+ return base.ReceivePluginInternal(message);
+ }
+ }
+
+ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
+ {
+ await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata));
+ await base.SaveAsync(metadata, snapshot);
+ }
+
+ protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ await _loadInterceptor.InterceptAsync(persistenceId, criteria);
+ return await base.LoadAsync(persistenceId, criteria);
+ }
+
+ protected override async Task DeleteAsync(SnapshotMetadata metadata)
+ {
+ await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata));
+ await base.DeleteAsync(metadata);
+ }
+
+ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ await _deleteInterceptor.InterceptAsync(persistenceId, criteria);
+ await base.DeleteAsync(persistenceId, criteria);
+ }
+
+ static SnapshotSelectionCriteria ToSelectionCriteria(SnapshotMetadata metadata)
+ => new SnapshotSelectionCriteria(metadata.SequenceNr, metadata.Timestamp, metadata.SequenceNr,
+ metadata.Timestamp);
+
+ ///
+ /// Create proxy object from snapshot store actor reference which can alter behavior of snapshot store.
+ ///
+ ///
+ /// Snapshot store actor must be of type.
+ ///
+ /// Journal actor reference.
+ /// Proxy object to control .
+ public static ITestSnapshotStore FromRef(IActorRef actor)
+ {
+ return new TestSnapshotStoreWrapper(actor);
+ }
+
+ public sealed class UseSaveInterceptor
+ {
+ public UseSaveInterceptor(ISnapshotStoreInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public ISnapshotStoreInterceptor Interceptor { get; }
+ }
+
+ public sealed class UseLoadInterceptor
+ {
+ public UseLoadInterceptor(ISnapshotStoreInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public ISnapshotStoreInterceptor Interceptor { get; }
+ }
+
+ public sealed class UseDeleteInterceptor
+ {
+ public UseDeleteInterceptor(ISnapshotStoreInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public ISnapshotStoreInterceptor Interceptor { get; }
+ }
+
+ public sealed class Ack
+ {
+ public static readonly Ack Instance = new Ack();
+ }
+
+ internal class TestSnapshotStoreWrapper : ITestSnapshotStore
+ {
+ public TestSnapshotStoreWrapper(IActorRef actor)
+ {
+ _actor = actor;
+ }
+
+ private readonly IActorRef _actor;
+
+ public SnapshotStoreSaveBehavior OnSave => new SnapshotStoreSaveBehavior(new SnapshotStoreSaveBehaviorSetter(_actor));
+ public SnapshotStoreLoadBehavior OnLoad => new SnapshotStoreLoadBehavior(new SnapshotStoreLoadBehaviorSetter(_actor));
+ public SnapshotStoreDeleteBehavior OnDelete => new SnapshotStoreDeleteBehavior(new SnapshotStoreDeleteBehaviorSetter(_actor));
+ }
+ }
+}
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs
new file mode 100644
index 00000000000..a67fc0ffc82
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs
@@ -0,0 +1,21 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2019 Lightbend Inc.
+// Copyright (C) 2013-2019 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+namespace Akka.Persistence.TestKit
+{
+ using System;
+ using System.Runtime.Serialization;
+
+ [Serializable]
+ public class TestSnapshotStoreFailureException : Exception
+ {
+ public TestSnapshotStoreFailureException() { }
+ public TestSnapshotStoreFailureException(string message) : base(message) { }
+ public TestSnapshotStoreFailureException(string message, Exception inner) : base(message, inner) { }
+ protected TestSnapshotStoreFailureException(SerializationInfo info, StreamingContext context) : base(info, context) { }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/config.conf b/src/core/Akka.Persistence.TestKit/config.conf
new file mode 100644
index 00000000000..dda3d635272
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/config.conf
@@ -0,0 +1,23 @@
+akka {
+ persistence {
+ journal {
+ plugin = "akka.persistence.journal.test"
+ auto-start-journals = ["akka.persistence.journal.test"]
+
+ test {
+ class = "Akka.Persistence.TestKit.TestJournal, Akka.Persistence.TestKit"
+ plugin-dispatcher = "akka.actor.default-dispatcher"
+ }
+ }
+
+ snapshot-store {
+ plugin = "akka.persistence.snapshot-store.test"
+ auto-start-snapshot-stores = ["akka.persistence.snapshot-store.test"]
+
+ test {
+ class = "Akka.Persistence.TestKit.TestSnapshotStore, Akka.Persistence.TestKit"
+ plugin-dispatcher = "akka.actor.default-dispatcher"
+ }
+ }
+ }
+}
\ No newline at end of file