diff --git a/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs index 667694c7406..5fcf2d3a49a 100644 --- a/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs +++ b/src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs @@ -17,36 +17,36 @@ public void must_return_ack_after_new_write_interceptor_is_set() } [Fact] - public void works_as_memory_journal_by_default() + public async Task works_as_memory_journal_by_default() { var actor = Sys.ActorOf(); // should pass - Journal.OnWrite.Pass(); + await Journal.OnWrite.Pass(); actor.Tell("write", TestActor); ExpectMsg("ack", TimeSpan.FromSeconds(3)); } [Fact] - public void when_fail_on_write_is_set_all_writes_to_journal_will_fail() + public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail() { var actor = Sys.ActorOf(); Watch(actor); - Journal.OnWrite.Fail(); + await Journal.OnWrite.Fail(); actor.Tell("write", TestActor); ExpectTerminated(actor, TimeSpan.FromSeconds(3)); } [Fact] - public void when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected() + public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected() { var actor = Sys.ActorOf(); Watch(actor); - Journal.OnWrite.Reject(); + await Journal.OnWrite.Reject(); actor.Tell("write", TestActor); ExpectMsg("rejected", TimeSpan.FromSeconds(3)); @@ -66,7 +66,19 @@ public async Task during_recovery_by_setting_fail_will_cause_recovery_failure() ExpectTerminated(actor, TimeSpan.FromSeconds(3)); }); + } + [Fact] + public async Task new_api() + { + await WithJournalWrite(write => write.Fail(), () => + { + var actor = Sys.ActorOf(); + Watch(actor); + + actor.Tell("write", TestActor); + ExpectTerminated(actor, TimeSpan.FromSeconds(3)); + }); } } diff --git a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj index 269674ea354..f035ac65991 100644 --- a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj +++ b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj @@ -10,16 +10,12 @@ 1.6.1 - - - - - + diff --git a/src/core/Akka.Persistence.TestKit/IJournalBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/IJournalBehaviorSetter.cs index 87ee986b14d..75bb2bac0f5 100644 --- a/src/core/Akka.Persistence.TestKit/IJournalBehaviorSetter.cs +++ b/src/core/Akka.Persistence.TestKit/IJournalBehaviorSetter.cs @@ -8,11 +8,12 @@ namespace Akka.Persistence.TestKit { using System; + using System.Threading.Tasks; using Actor; public interface IJournalBehaviorSetter { - void SetInterceptor(IJournalInterceptor interceptor); + Task SetInterceptorAsync(IJournalInterceptor interceptor); } internal class JournalWriteBehaviorSetter : IJournalBehaviorSetter @@ -24,11 +25,11 @@ internal JournalWriteBehaviorSetter(IActorRef journal) private readonly IActorRef _journal; - public void SetInterceptor(IJournalInterceptor interceptor) - => _journal.Ask( + public Task SetInterceptorAsync(IJournalInterceptor interceptor) + => _journal.Ask( new TestJournal.UseWriteInterceptor(interceptor), TimeSpan.FromSeconds(3) - ).Wait(); + ); } internal class JournalRecoveryBehaviorSetter : IJournalBehaviorSetter @@ -40,10 +41,10 @@ internal JournalRecoveryBehaviorSetter(IActorRef journal) private readonly IActorRef _journal; - public void SetInterceptor(IJournalInterceptor interceptor) - => _journal.Ask( + public Task SetInterceptorAsync(IJournalInterceptor interceptor) + => _journal.Ask( new TestJournal.UseRecoveryInterceptor(interceptor), TimeSpan.FromSeconds(3) - ).Wait(); + ); } } \ No newline at end of file diff --git a/src/core/Akka.Persistence.TestKit/JournalRecoveryBehavior.cs b/src/core/Akka.Persistence.TestKit/JournalRecoveryBehavior.cs index 19764219af5..d600289e494 100644 --- a/src/core/Akka.Persistence.TestKit/JournalRecoveryBehavior.cs +++ b/src/core/Akka.Persistence.TestKit/JournalRecoveryBehavior.cs @@ -19,85 +19,85 @@ internal JournalRecoveryBehavior(IJournalBehaviorSetter setter) protected readonly IJournalBehaviorSetter Setter; - protected void SetInterceptor(IJournalInterceptor interceptor) => Setter.SetInterceptor(interceptor); + public Task SetInterceptorAsync(IJournalInterceptor interceptor) => Setter.SetInterceptorAsync(interceptor); - public void Pass() => SetInterceptor(JournalInterceptors.Noop.Instance); + public Task Pass() => SetInterceptorAsync(JournalInterceptors.Noop.Instance); - public void PassWithDelay(TimeSpan delay) + public Task PassWithDelay(TimeSpan delay) { if (delay <= TimeSpan.Zero) { throw new ArgumentException("Delay must be greater than zero", nameof(delay)); } - SetInterceptor(new JournalInterceptors.Delay(delay, JournalInterceptors.Noop.Instance)); + return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Noop.Instance)); } - public void Fail() => SetInterceptor(JournalInterceptors.Failure.Instance); + public Task Fail() => SetInterceptorAsync(JournalInterceptors.Failure.Instance); - public void FailOnType() => FailOnType(typeof(TMessage)); + public Task FailOnType() => FailOnType(typeof(TMessage)); - public void FailOnType(Type messageType) + public Task FailOnType(Type messageType) { if (messageType is null) { throw new ArgumentNullException(nameof(messageType)); } - SetInterceptor(new JournalInterceptors.OnType(messageType, JournalInterceptors.Failure.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnType(messageType, JournalInterceptors.Failure.Instance)); } - public void FailIf(Func predicate) + public Task FailIf(Func predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance)); } - public void FailIf(Func> predicate) + public Task FailIf(Func> predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance)); } - public void FailUnless(Func predicate) + public Task FailUnless(Func predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true)); } - public void FailUnless(Func> predicate) + public Task FailUnless(Func> predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Failure.Instance, negate: true)); } - public void FailWithDelay(TimeSpan delay) + public Task FailWithDelay(TimeSpan delay) { if (delay <= TimeSpan.Zero) { throw new ArgumentException("Delay must be greater than zero", nameof(delay)); } - SetInterceptor(new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)); + return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance)); } - public void FailIfWithDelay(TimeSpan delay, Func> predicate) + public Task FailIfWithDelay(TimeSpan delay, Func> predicate) { if (delay <= TimeSpan.Zero) { @@ -109,13 +109,13 @@ public void FailIfWithDelay(TimeSpan delay, Func predicate) + public Task FailIfWithDelay(TimeSpan delay, Func predicate) { if (delay <= TimeSpan.Zero) { @@ -127,13 +127,13 @@ public void FailIfWithDelay(TimeSpan delay, Func predicate) + public Task FailUnlessWithDelay(TimeSpan delay, Func predicate) { if (delay <= TimeSpan.Zero) { @@ -145,14 +145,14 @@ public void FailUnlessWithDelay(TimeSpan delay, Func> predicate) + public Task FailUnlessWithDelay(TimeSpan delay, Func> predicate) { if (delay <= TimeSpan.Zero) { @@ -164,16 +164,16 @@ public void FailUnlessWithDelay(TimeSpan delay, Func(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage)); + public Task FailOnTypeWithDelay(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage)); - public void FailOnTypeWithDelay(TimeSpan delay, Type messageType) + public Task FailOnTypeWithDelay(TimeSpan delay, Type messageType) { if (delay <= TimeSpan.Zero) { @@ -185,7 +185,7 @@ public void FailOnTypeWithDelay(TimeSpan delay, Type messageType) throw new ArgumentNullException(nameof(messageType)); } - SetInterceptor(new JournalInterceptors.OnType( + return SetInterceptorAsync(new JournalInterceptors.OnType( messageType, new JournalInterceptors.Delay(delay, JournalInterceptors.Failure.Instance) )); diff --git a/src/core/Akka.Persistence.TestKit/JournalWriteBehavior.cs b/src/core/Akka.Persistence.TestKit/JournalWriteBehavior.cs index c20ee1f0f9f..d25a16c399a 100644 --- a/src/core/Akka.Persistence.TestKit/JournalWriteBehavior.cs +++ b/src/core/Akka.Persistence.TestKit/JournalWriteBehavior.cs @@ -14,71 +14,71 @@ public class JournalWriteBehavior : JournalRecoveryBehavior { internal JournalWriteBehavior(IJournalBehaviorSetter setter) : base(setter) { } - public void Reject() => SetInterceptor(JournalInterceptors.Rejection.Instance); + public Task Reject() => SetInterceptorAsync(JournalInterceptors.Rejection.Instance); - public void RejectOnType() => FailOnType(typeof(TMessage)); + public Task RejectOnType() => FailOnType(typeof(TMessage)); - public void RejectOnType(Type messageType) + public Task RejectOnType(Type messageType) { if (messageType is null) { throw new ArgumentNullException(nameof(messageType)); } - SetInterceptor(new JournalInterceptors.OnType(messageType, JournalInterceptors.Rejection.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnType(messageType, JournalInterceptors.Rejection.Instance)); } - public void RejectIf(Func predicate) + public Task RejectIf(Func predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance)); } - public void RejectIf(Func> predicate) + public Task RejectIf(Func> predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance)); } - public void RejectUnless(Func predicate) + public Task RejectUnless(Func predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true)); } - public void RejectUnless(Func> predicate) + public Task RejectUnless(Func> predicate) { if (predicate is null) { throw new ArgumentNullException(nameof(predicate)); } - SetInterceptor(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true)); + return SetInterceptorAsync(new JournalInterceptors.OnCondition(predicate, JournalInterceptors.Rejection.Instance, negate: true)); } - public void RejectWithDelay(TimeSpan delay) + public Task RejectWithDelay(TimeSpan delay) { if (delay <= TimeSpan.Zero) { throw new ArgumentException("Delay must be greater than zero", nameof(delay)); } - SetInterceptor(new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance)); + return SetInterceptorAsync(new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance)); } - public void RejectIfWithDelay(TimeSpan delay, Func> predicate) + public Task RejectIfWithDelay(TimeSpan delay, Func> predicate) { if (delay <= TimeSpan.Zero) { @@ -90,13 +90,13 @@ public void RejectIfWithDelay(TimeSpan delay, Func predicate) + public Task RejectIfWithDelay(TimeSpan delay, Func predicate) { if (delay <= TimeSpan.Zero) { @@ -108,13 +108,13 @@ public void RejectIfWithDelay(TimeSpan delay, Func predicate) + public Task RejectUnlessWithDelay(TimeSpan delay, Func predicate) { if (delay <= TimeSpan.Zero) { @@ -126,14 +126,14 @@ public void RejectUnlessWithDelay(TimeSpan delay, Func> predicate) + public Task RejectUnlessWithDelay(TimeSpan delay, Func> predicate) { if (delay <= TimeSpan.Zero) { @@ -145,16 +145,16 @@ public void RejectUnlessWithDelay(TimeSpan delay, Func(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage)); + public Task RejectOnTypeWithDelay(TimeSpan delay) => FailOnTypeWithDelay(delay, typeof(TMessage)); - public void RejectOnTypeWithDelay(TimeSpan delay, Type messageType) + public Task RejectOnTypeWithDelay(TimeSpan delay, Type messageType) { if (delay <= TimeSpan.Zero) { @@ -166,7 +166,7 @@ public void RejectOnTypeWithDelay(TimeSpan delay, Type messageType) throw new ArgumentNullException(nameof(messageType)); } - SetInterceptor(new JournalInterceptors.OnType( + return SetInterceptorAsync(new JournalInterceptors.OnType( messageType, new JournalInterceptors.Delay(delay, JournalInterceptors.Rejection.Instance) )); diff --git a/src/core/Akka.Persistence.TestKit/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit/PersistenceTestKit.cs index 96e59b6f8b2..ce467aac644 100644 --- a/src/core/Akka.Persistence.TestKit/PersistenceTestKit.cs +++ b/src/core/Akka.Persistence.TestKit/PersistenceTestKit.cs @@ -7,58 +7,13 @@ namespace Akka.Persistence.TestKit { - using Actor; - using Akka.TestKit; - using Akka.TestKit.Xunit; - using Configuration; - using System; + using Akka.TestKit.Xunit2; - public abstract class PersistenceTestKit : TestKitBase + public abstract class PersistenceTestKit : PersistenceTestKitBase { protected PersistenceTestKit(string actorSystemName = null, string testActorName = null) - : base(new XunitAssertions(), GetConfig(), actorSystemName, testActorName) + : base(new XunitAssertions(), actorSystemName, testActorName) { - JournalActorRef = GetJournalRef(Sys); - Journal = TestJournal.FromRef(JournalActorRef); } - - public IActorRef JournalActorRef { get; } - - public ITestJournal Journal { get; } - - public void WithFailingJournalRecovery(Action execution) - { - try - { - Journal.OnRecovery.Fail(); - execution(); - } - finally - { - // restore normal functionality - Journal.OnRecovery.Pass(); - } - } - - public void WithFailingJournalWrites(Action execution) - { - try - { - Journal.OnWrite.Fail(); - execution(); - } - finally - { - // restore normal functionality - Journal.OnWrite.Pass(); - } - } - - static IActorRef GetJournalRef(ActorSystem sys) - => Persistence.Instance.Apply(sys).JournalFor(null); - - static Config GetConfig() - => ConfigurationFactory.FromResource("Akka.Persistence.TestKit.test-journal.conf"); - } } \ No newline at end of file diff --git a/src/core/Akka.Persistence.TestKit/PersistenceTestKitBase.cs b/src/core/Akka.Persistence.TestKit/PersistenceTestKitBase.cs new file mode 100644 index 00000000000..1d9d51cadaf --- /dev/null +++ b/src/core/Akka.Persistence.TestKit/PersistenceTestKitBase.cs @@ -0,0 +1,103 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2019 Lightbend Inc. +// Copyright (C) 2013-2019 .NET Foundation +// +//----------------------------------------------------------------------- + +namespace Akka.Persistence.TestKit +{ + using System; + using System.Threading.Tasks; + using Actor; + using Akka.TestKit; + using Configuration; + + public abstract class PersistenceTestKitBase : TestKitBase + { + protected PersistenceTestKitBase(ITestKitAssertions assertions, string actorSystemName = null, string testActorName = null) + : base(assertions, GetConfig(), actorSystemName, testActorName) + { + JournalActorRef = GetJournalRef(Sys); + Journal = TestJournal.FromRef(JournalActorRef); + } + + public IActorRef JournalActorRef { get; } + + public ITestJournal Journal { get; } + + public async Task WithJournalRecovery(Func behaviorSelector, Func execution) + { + try + { + await behaviorSelector(Journal.OnRecovery); + await execution(); + } + finally + { + await Journal.OnRecovery.Pass(); + } + } + + public async Task WithJournalWrite(Func behaviorSelector, Func execution) + { + try + { + await behaviorSelector(Journal.OnWrite); + await execution(); + } + finally + { + await Journal.OnWrite.Pass(); + } + } + + public Task WithJournalRecovery(Func behaviorSelector, Action execution) + => WithJournalRecovery(behaviorSelector, () => + { + execution(); + return Task.FromResult(new object()); + }); + + public Task WithJournalWrite(Func behaviorSelector, Action execution) + => WithJournalWrite(behaviorSelector, () => + { + execution(); + return Task.FromResult(new object()); + }); + + public void WithFailingJournalRecovery(Action execution) + { + try + { + Journal.OnRecovery.Fail(); + execution(); + } + finally + { + // restore normal functionality + Journal.OnRecovery.Pass(); + } + } + + public void WithFailingJournalWrites(Action execution) + { + try + { + Journal.OnWrite.Fail(); + execution(); + } + finally + { + // restore normal functionality + Journal.OnWrite.Pass(); + } + } + + static IActorRef GetJournalRef(ActorSystem sys) + => Persistence.Instance.Apply(sys).JournalFor(null); + + static Config GetConfig() + => ConfigurationFactory.FromResource("Akka.Persistence.TestKit.test-journal.conf"); + } +} \ No newline at end of file