From 539cb06eae7a432629a502cfa649f3d448ebdce4 Mon Sep 17 00:00:00 2001 From: Bert Hertogen Date: Fri, 20 Jun 2025 10:00:53 +0200 Subject: [PATCH 1/4] Add persistence support and related tests - Add `PersistenceTestKit` with methods for testing journal and snapshot behaviors, enabling effective simulation of persistence scenarios. - Updated `Akka.Hosting.TestKit.csproj` to include `Akka.Persistence.TestKit` and a reference to `Akka.Persistence.Hosting`. - Add `TestJournalSpec` and `TestSnapshotStoreSpec` with tests for new persistence functionality, covering various scenarios like successful writes and recovery. --- .../TestActorRefTests/PersistActor.cs | 72 +++++ .../TestActorRefTests/SnapshotActor.cs | 100 ++++++ .../TestJournalSpec.cs | 163 ++++++++++ .../TestSnapshotStoreSpec.cs | 116 +++++++ .../Akka.Hosting.TestKit.csproj | 2 + .../PersistenceTestKit.cs | 306 ++++++++++++++++++ 6 files changed, 759 insertions(+) create mode 100644 src/Akka.Hosting.TestKit.Tests/TestActorRefTests/PersistActor.cs create mode 100644 src/Akka.Hosting.TestKit.Tests/TestActorRefTests/SnapshotActor.cs create mode 100644 src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs create mode 100644 src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs create mode 100644 src/Akka.Hosting.TestKit/PersistenceTestKit.cs diff --git a/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/PersistActor.cs b/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/PersistActor.cs new file mode 100644 index 00000000..96b82119 --- /dev/null +++ b/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/PersistActor.cs @@ -0,0 +1,72 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Persistence; + +namespace Akka.Hosting.TestKit.Tests.TestActorRefTests +{ + using System; + using Actor; + + public class PersistActor : UntypedPersistentActor + { + public PersistActor(IActorRef probe) + { + _probe = probe; + } + + private readonly IActorRef _probe; + + public override string PersistenceId => "foo"; + + protected override void OnCommand(object message) + { + switch (message) + { + case WriteMessage msg: + Persist(msg.Data, _ => + { + _probe.Tell("ack"); + }); + + break; + + default: + return; + } + } + + protected override void OnRecover(object message) + { + _probe.Tell(message); + } + + protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr) + { + _probe.Tell("failure"); + + base.OnPersistFailure(cause, @event, sequenceNr); + } + + protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr) + { + _probe.Tell("rejected"); + + base.OnPersistRejected(cause, @event, sequenceNr); + } + + public class WriteMessage + { + public string Data { get; } + + public WriteMessage(string data) + { + Data = data; + } + } + } +} diff --git a/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/SnapshotActor.cs b/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/SnapshotActor.cs new file mode 100644 index 00000000..fe4fe86c --- /dev/null +++ b/src/Akka.Hosting.TestKit.Tests/TestActorRefTests/SnapshotActor.cs @@ -0,0 +1,100 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +namespace Akka.Persistence.TestKit.Tests +{ + using System; + using Actor; + + public class SnapshotActor : UntypedPersistentActor + { + public SnapshotActor(IActorRef probe) + { + _probe = probe; + } + + private readonly IActorRef _probe; + + public override string PersistenceId => "bar"; + + protected override void OnCommand(object message) + { + switch (message) + { + case "save": + SaveSnapshot(message); + return; + + case DeleteOne del: + DeleteSnapshot(del.SequenceNr); + return; + + case DeleteMany del: + DeleteSnapshots(del.Criteria); + return; + + case SaveSnapshotSuccess _: + case SaveSnapshotFailure _: + case DeleteSnapshotSuccess _: + case DeleteSnapshotFailure _: + case DeleteSnapshotsSuccess _: + case DeleteSnapshotsFailure _: + _probe.Tell(message); + return; + + default: + return; + } + } + + protected override void OnRecover(object message) + { + if (message is SnapshotOffer snapshot) + { + _probe.Tell(message); + } + } + + protected override void OnRecoveryFailure(Exception reason, object message) + { + _probe.Tell(new RecoveryFailure(reason, message)); + base.OnRecoveryFailure(reason, message); + } + + public class DeleteOne + { + public DeleteOne(long sequenceNr) + { + SequenceNr = sequenceNr; + } + + public long SequenceNr { get; } + } + + public class DeleteMany + { + public DeleteMany(SnapshotSelectionCriteria criteria) + { + Criteria = criteria; + } + + public SnapshotSelectionCriteria Criteria { get; } + } + + public class RecoveryFailure + { + public RecoveryFailure(Exception reason, object message) + { + Reason = reason; + Message = message; + } + + public Exception Reason { get; } + public object Message { get; } + } + } +} diff --git a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs new file mode 100644 index 00000000..a3709b6d --- /dev/null +++ b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs @@ -0,0 +1,163 @@ +using Akka.Actor; +using Akka.Configuration; +using Akka.Hosting.TestKit.Tests.TestActorRefTests; +using Akka.Persistence; +using Akka.Persistence.TestKit; +using Akka.TestKit; +using FluentAssertions; +using System; +using System.Threading.Tasks; +using Xunit; + +namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests +{ + + public class TestJournalSpec : PersistenceTestKit + { + // Expect should be passing by default, need to make them less sencitive to timing + private static readonly Config DefaultTimeoutConfig = "akka.test.single-expect-default = 30s"; + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + base.ConfigureAkka(builder, provider); + builder.WithActors((system, registry) => + { + probe = CreateTestProbe(); + var echo = system.ActorOf(Props.Create(() => new PersistActor(probe))); + registry.Register(echo); + }); + } + + private TestProbe? probe; + + public TestJournalSpec() : base(DefaultTimeoutConfig) + { + } + + [Fact] + public void must_have_journal_and_snapshot() + { + probe.Should().NotBeNull(); + Journal.Should().NotBeNull(); + JournalActorRef.Should().NotBeNull(); + Snapshots.Should().NotBeNull(); + SnapshotsActorRef.Should().NotBeNull(); + } + + [Fact] + public async Task must_return_ack_after_new_write_interceptor_is_set() + { + JournalActorRef!.Tell(new TestJournal.UseWriteInterceptor(null), TestActor); + + await ExpectMsgAsync(TimeSpan.FromSeconds(3)); + } + + [Fact] + public async Task works_as_memory_journal_by_default() + { + var actor = ActorRegistry.Get(); + await probe!.ExpectMsgAsync(); + + await Journal!.OnWrite.Pass(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + + await probe.ExpectMsgAsync("ack"); + } + + [Fact] + public async Task must_recover_restarted_actor() + { + var actor = ActorRegistry.Get(); + Watch(actor); + await probe!.ExpectMsgAsync(); + + await Journal!.OnRecovery.Pass(); + actor.Tell(new PersistActor.WriteMessage("1"), TestActor); + await probe.ExpectMsgAsync("ack"); + actor.Tell(new PersistActor.WriteMessage("2"), TestActor); + await probe.ExpectMsgAsync("ack"); + + await actor.GracefulStop(TimeSpan.FromSeconds(1)); + await ExpectTerminatedAsync(actor); + + ActorOf(() => new PersistActor(probe)); + await probe.ExpectMsgAsync("1"); + await probe.ExpectMsgAsync("2"); + await probe.ExpectMsgAsync(); + } + + [Fact] + public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail() + { + var actor = ActorRegistry.Get(); + Watch(actor); + await probe!.ExpectMsgAsync(); + + await Journal!.OnWrite.Fail(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + + await probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + } + + [Fact] + public async Task must_recover_failed_actor() + { + var actor = ActorRegistry.Get(); + Watch(actor); + await probe!.ExpectMsgAsync(); + + await Journal!.OnRecovery.Pass(); + actor.Tell(new PersistActor.WriteMessage("1"), TestActor); + await probe.ExpectMsgAsync("ack"); + actor.Tell(new PersistActor.WriteMessage("2"), TestActor); + await probe.ExpectMsgAsync("ack"); + + await Journal.OnWrite.Fail(); + actor.Tell(new PersistActor.WriteMessage("3"), TestActor); + + await probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + + ActorOf(() => new PersistActor(probe)); + await probe.ExpectMsgAsync("1"); + await probe.ExpectMsgAsync("2"); + await probe.ExpectMsgAsync(); + } + + [Fact] + public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected() + { + var actor = ActorRegistry.Get(); + Watch(actor); + await probe!.ExpectMsgAsync(); + + await Journal!.OnWrite.Reject(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + + await probe.ExpectMsgAsync("rejected"); + } + + [Fact] + public async Task journal_must_reset_state_to_pass() + { + await WithJournalWrite(write => write.Fail(), async () => + { + var actor = ActorRegistry.Get(); + Watch(actor); + await probe!.ExpectMsgAsync(); + + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + await probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + }); + + var actor2 = ActorOf(() => new PersistActor(probe!)); + Watch(actor2); + + await probe!.ExpectMsgAsync(); + actor2.Tell(new PersistActor.WriteMessage("write"), TestActor); + await probe.ExpectMsgAsync("ack"); + } + } +} diff --git a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs new file mode 100644 index 00000000..78b387a0 --- /dev/null +++ b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs @@ -0,0 +1,116 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using Xunit.Abstractions; + +namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests +{ + using Actor; + using Akka.Persistence; + using Akka.Persistence.TestKit; + using Akka.Persistence.TestKit.Tests; + using Akka.TestKit; + using System; + using System.Threading.Tasks; + using Xunit; + + public sealed class TestSnapshotStoreSpec : PersistenceTestKit + { + public TestSnapshotStoreSpec(ITestOutputHelper output) : base(nameof(TestSnapshotStoreSpec), output: output) + { + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + base.ConfigureAkka(builder, provider); + builder.WithActors((system, registry) => + { + probe = CreateTestProbe(); + var echo = system.ActorOf(Props.Create(() => new SnapshotActor(probe))); + registry.Register(echo); + }); + } + + private TestProbe? probe; + + [Fact] + public async Task send_ack_after_load_interceptor_is_set() + { + SnapshotsActorRef!.Tell(new TestSnapshotStore.UseLoadInterceptor(null), TestActor); + await ExpectMsgAsync(); + } + + [Fact] + public async Task send_ack_after_save_interceptor_is_set() + { + SnapshotsActorRef!.Tell(new TestSnapshotStore.UseSaveInterceptor(null), TestActor); + await ExpectMsgAsync(); + } + + [Fact] + public async Task send_ack_after_delete_interceptor_is_set() + { + SnapshotsActorRef!.Tell(new TestSnapshotStore.UseDeleteInterceptor(null), TestActor); + await ExpectMsgAsync(); + } + + [Fact] + public async Task after_load_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorRegistry.Get(); + actor.Tell("save"); + await probe!.ExpectMsgAsync(); + await actor.GracefulStop(TimeSpan.FromSeconds(3)); + + await WithSnapshotLoad(load => load.Fail(), async () => + { + ActorOf(() => new SnapshotActor(probe!)); + await probe!.ExpectMsgAsync(); + }); + + ActorOf(() => new SnapshotActor(probe!)); + await probe!.ExpectMsgAsync(); + } + + [Fact] + public async Task after_save_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorRegistry.Get(); + + await WithSnapshotSave(save => save.Fail(), async () => + { + actor.Tell("save"); + await probe!.ExpectMsgAsync(); + }); + + actor.Tell("save"); + await probe!.ExpectMsgAsync(); + } + + [Fact] + public async Task after_delete_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorRegistry.Get(); + actor.Tell("save"); + + var success = await probe!.ExpectMsgAsync(); + var nr = success.Metadata.SequenceNr; + + await WithSnapshotDelete(del => del.Fail(), async () => + { + actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor); + await probe!.ExpectMsgAsync(); + }); + + actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor); + await probe!.ExpectMsgAsync(); + } + } +} diff --git a/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj b/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj index b16a0e63..df752999 100644 --- a/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj +++ b/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj @@ -10,6 +10,7 @@ + @@ -18,6 +19,7 @@ + diff --git a/src/Akka.Hosting.TestKit/PersistenceTestKit.cs b/src/Akka.Hosting.TestKit/PersistenceTestKit.cs new file mode 100644 index 00000000..8a00f897 --- /dev/null +++ b/src/Akka.Hosting.TestKit/PersistenceTestKit.cs @@ -0,0 +1,306 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.TestKit; +using Xunit.Abstractions; + +namespace Akka.Hosting.TestKit +{ + public abstract class PersistenceTestKit : TestKit + { + /// + /// Create a new instance of the class. + /// A new system with the specified configuration will be created. + /// + /// Test ActorSystem configuration + /// Optional: The name of the actor system + /// TBD + public PersistenceTestKit(Config? config = null, string? actorSystemName = null, ITestOutputHelper? output = null) + : base(actorSystemName, output) + { + _config = GetConfig(config ?? Config.Empty); + } + + /// + /// Actor reference to persistence Journal used by current actor system. + /// + public IActorRef? JournalActorRef { get; set; } + + /// + /// Actor reference to persistence Snapshot Store used by current actor system. + /// + public IActorRef? SnapshotsActorRef { get; set; } + + /// + /// + /// + public ITestJournal? Journal { get; set; } + + /// + /// + /// + public ITestSnapshotStore? Snapshots { get; set; } + + /// + /// Execute delegate with Journal Behavior applied to Recovery operation. + /// + /// + /// After will be executed, Recovery behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithJournalRecovery(Func behaviorSelector, Func execution) + { + if (Journal == null) throw new ArgumentNullException(nameof(Journal)); + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Journal.OnRecovery); + await execution(); + } + finally + { + await Journal.OnRecovery.Pass(); + } + } + + /// + /// Execute delegate with Journal Behavior applied to Write operation. + /// + /// + /// After will be executed, Write behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithJournalWrite(Func behaviorSelector, Func execution) + { + if (Journal == null) throw new ArgumentNullException(nameof(Journal)); + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Journal.OnWrite); + await execution(); + } + finally + { + await Journal.OnWrite.Pass(); + } + } + + /// + /// Execute delegate with Journal Behavior applied to Recovery operation. + /// + /// + /// After will be executed, Recovery behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithJournalRecovery(Func behaviorSelector, Action execution) + => WithJournalRecovery(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + execution(); + return Task.FromResult(new object()); + }); + + /// + /// Execute delegate with Journal Behavior applied to Write operation. + /// + /// + /// After will be executed, Write behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithJournalWrite(Func behaviorSelector, Action execution) + => WithJournalWrite(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + execution(); + return Task.FromResult(new object()); + }); + + /// + /// Execute delegate with Snapshot Store Behavior applied to Save operation. + /// + /// + /// After will be executed, Save behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotSave(Func behaviorSelector, Func execution) + { + if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Snapshots.OnSave); + await execution(); + } + finally + { + await Snapshots.OnSave.Pass(); + } + } + + /// + /// Execute delegate with Snapshot Store Behavior applied to Load operation. + /// + /// + /// After will be executed, Load behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotLoad(Func behaviorSelector, Func execution) + { + if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Snapshots.OnLoad); + await execution(); + } + finally + { + await Snapshots.OnLoad.Pass(); + } + } + + /// + /// Execute delegate with Snapshot Store Behavior applied to Delete operation. + /// + /// + /// After will be executed, Delete behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotDelete(Func behaviorSelector, Func execution) + { + if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Snapshots.OnDelete); + await execution(); + } + finally + { + await Snapshots.OnDelete.Pass(); + } + } + + /// + /// Execute delegate with Snapshot Store Behavior applied to Save operation. + /// + /// + /// After will be executed, Save behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotSave(Func behaviorSelector, Action execution) + => WithSnapshotSave(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + execution(); + return Task.FromResult(true); + }); + + /// + /// Execute delegate with Snapshot Store Behavior applied to Load operation. + /// + /// + /// After will be executed, Load behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotLoad(Func behaviorSelector, Action execution) + => WithSnapshotLoad(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + execution(); + return Task.FromResult(true); + }); + + /// + /// Execute delegate with Snapshot Store Behavior applied to Delete operation. + /// + /// + /// After will be executed, Delete behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotDelete(Func behaviorSelector, Action execution) + => WithSnapshotDelete(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + execution(); + return Task.FromResult(true); + }); + + private readonly Config _config; + protected override Config? Config => _config; + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + if (_config is not null) + { + builder.AddHocon(_config, HoconAddMode.Append); + + builder.AddStartup((system, registry) => + { + var persistenceExtension = Persistence.Persistence.Instance.Apply(system); + JournalActorRef = persistenceExtension.JournalFor(null); + Journal = TestJournal.FromRef(JournalActorRef); + SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null); + Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef); + }); + } + } + + /// + /// Loads from embedded resources actor system persistence configuration with and + /// configured as default persistence plugins. + /// + /// Custom configuration that was passed in the constructor. + /// Actor system configuration object. + /// + private static Config GetConfig(Config customConfig) + { + var defaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); + if (customConfig == Config.Empty) return defaultConfig; + else return defaultConfig.SafeWithFallback(customConfig); + } + } +} + From 51f63723ef3002d582d48d5afd401fc27427f7be Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 21 Jun 2025 02:05:08 +0700 Subject: [PATCH 2/4] Update code to conform to Akka.Hosting.TestKit standard --- .../TestJournalSpec.cs | 245 +++++---- .../TestSnapshotStoreSpec.cs | 168 +++--- .../PersistenceTestKit.cs | 485 +++++++++--------- 3 files changed, 429 insertions(+), 469 deletions(-) diff --git a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs index a3709b6d..119f009d 100644 --- a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs +++ b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestJournalSpec.cs @@ -8,156 +8,149 @@ using System; using System.Threading.Tasks; using Xunit; +using Xunit.Abstractions; -namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests +namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests; + +public class TestJournalSpec : PersistenceTestKit { + private TestProbe _probe = null!; - public class TestJournalSpec : PersistenceTestKit + public TestJournalSpec(ITestOutputHelper output) : base(nameof(TestJournalSpec), output) { - // Expect should be passing by default, need to make them less sencitive to timing - private static readonly Config DefaultTimeoutConfig = "akka.test.single-expect-default = 30s"; - - protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) - { - base.ConfigureAkka(builder, provider); - builder.WithActors((system, registry) => - { - probe = CreateTestProbe(); - var echo = system.ActorOf(Props.Create(() => new PersistActor(probe))); - registry.Register(echo); - }); - } - - private TestProbe? probe; - - public TestJournalSpec() : base(DefaultTimeoutConfig) - { - } - - [Fact] - public void must_have_journal_and_snapshot() - { - probe.Should().NotBeNull(); - Journal.Should().NotBeNull(); - JournalActorRef.Should().NotBeNull(); - Snapshots.Should().NotBeNull(); - SnapshotsActorRef.Should().NotBeNull(); - } - - [Fact] - public async Task must_return_ack_after_new_write_interceptor_is_set() - { - JournalActorRef!.Tell(new TestJournal.UseWriteInterceptor(null), TestActor); + } - await ExpectMsgAsync(TimeSpan.FromSeconds(3)); - } + // Expect should be passing by default, need to make them less sensitive to timing + protected override Config? Config => "akka.test.single-expect-default = 30s"; - [Fact] - public async Task works_as_memory_journal_by_default() - { - var actor = ActorRegistry.Get(); - await probe!.ExpectMsgAsync(); + protected override Task BeforeTestStart() + { + _probe = CreateTestProbe(); + return Task.CompletedTask; + } - await Journal!.OnWrite.Pass(); - actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + [Fact] + public void must_have_journal_and_snapshot() + { + Journal.Should().NotBeNull(); + JournalActorRef.Should().NotBeNull(); + Snapshots.Should().NotBeNull(); + SnapshotsActorRef.Should().NotBeNull(); + } - await probe.ExpectMsgAsync("ack"); - } + [Fact] + public async Task must_return_ack_after_new_write_interceptor_is_set() + { + JournalActorRef.Tell(new TestJournal.UseWriteInterceptor(null), TestActor); - [Fact] - public async Task must_recover_restarted_actor() - { - var actor = ActorRegistry.Get(); - Watch(actor); - await probe!.ExpectMsgAsync(); + await ExpectMsgAsync(TimeSpan.FromSeconds(3)); + } - await Journal!.OnRecovery.Pass(); - actor.Tell(new PersistActor.WriteMessage("1"), TestActor); - await probe.ExpectMsgAsync("ack"); - actor.Tell(new PersistActor.WriteMessage("2"), TestActor); - await probe.ExpectMsgAsync("ack"); + [Fact] + public async Task works_as_memory_journal_by_default() + { + var actor = ActorOf(() => new PersistActor(_probe)); + await _probe.ExpectMsgAsync(); - await actor.GracefulStop(TimeSpan.FromSeconds(1)); - await ExpectTerminatedAsync(actor); + await Journal.OnWrite.Pass(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); - ActorOf(() => new PersistActor(probe)); - await probe.ExpectMsgAsync("1"); - await probe.ExpectMsgAsync("2"); - await probe.ExpectMsgAsync(); - } + await _probe.ExpectMsgAsync("ack"); + } - [Fact] - public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail() - { - var actor = ActorRegistry.Get(); - Watch(actor); - await probe!.ExpectMsgAsync(); + [Fact] + public async Task must_recover_restarted_actor() + { + var actor = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor); + await _probe.ExpectMsgAsync(); + + await Journal.OnRecovery.Pass(); + actor.Tell(new PersistActor.WriteMessage("1"), TestActor); + await _probe.ExpectMsgAsync("ack"); + actor.Tell(new PersistActor.WriteMessage("2"), TestActor); + await _probe.ExpectMsgAsync("ack"); + + await actor.GracefulStop(TimeSpan.FromSeconds(1)); + await ExpectTerminatedAsync(actor); + + ActorOf(() => new PersistActor(_probe)); + await _probe.ExpectMsgAsync("1"); + await _probe.ExpectMsgAsync("2"); + await _probe.ExpectMsgAsync(); + } - await Journal!.OnWrite.Fail(); - actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + [Fact] + public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail() + { + var actor = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor); + await _probe.ExpectMsgAsync(); - await probe.ExpectMsgAsync("failure"); - await ExpectTerminatedAsync(actor); - } + await Journal.OnWrite.Fail(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); - [Fact] - public async Task must_recover_failed_actor() - { - var actor = ActorRegistry.Get(); - Watch(actor); - await probe!.ExpectMsgAsync(); + await _probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + } - await Journal!.OnRecovery.Pass(); - actor.Tell(new PersistActor.WriteMessage("1"), TestActor); - await probe.ExpectMsgAsync("ack"); - actor.Tell(new PersistActor.WriteMessage("2"), TestActor); - await probe.ExpectMsgAsync("ack"); + [Fact] + public async Task must_recover_failed_actor() + { + var actor = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor); + await _probe.ExpectMsgAsync(); + + await Journal.OnRecovery.Pass(); + actor.Tell(new PersistActor.WriteMessage("1"), TestActor); + await _probe.ExpectMsgAsync("ack"); + actor.Tell(new PersistActor.WriteMessage("2"), TestActor); + await _probe.ExpectMsgAsync("ack"); + + await Journal.OnWrite.Fail(); + actor.Tell(new PersistActor.WriteMessage("3"), TestActor); + + await _probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + + ActorOf(() => new PersistActor(_probe)); + await _probe.ExpectMsgAsync("1"); + await _probe.ExpectMsgAsync("2"); + await _probe.ExpectMsgAsync(); + } - await Journal.OnWrite.Fail(); - actor.Tell(new PersistActor.WriteMessage("3"), TestActor); + [Fact] + public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected() + { + var actor = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor); + await _probe.ExpectMsgAsync(); - await probe.ExpectMsgAsync("failure"); - await ExpectTerminatedAsync(actor); + await Journal.OnWrite.Reject(); + actor.Tell(new PersistActor.WriteMessage("write"), TestActor); - ActorOf(() => new PersistActor(probe)); - await probe.ExpectMsgAsync("1"); - await probe.ExpectMsgAsync("2"); - await probe.ExpectMsgAsync(); - } + await _probe.ExpectMsgAsync("rejected"); + } - [Fact] - public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected() + [Fact] + public async Task journal_must_reset_state_to_pass() + { + await WithJournalWrite(write => write.Fail(), async () => { - var actor = ActorRegistry.Get(); - Watch(actor); - await probe!.ExpectMsgAsync(); + var actor = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor); + await _probe.ExpectMsgAsync(); - await Journal!.OnWrite.Reject(); actor.Tell(new PersistActor.WriteMessage("write"), TestActor); + await _probe.ExpectMsgAsync("failure"); + await ExpectTerminatedAsync(actor); + }); - await probe.ExpectMsgAsync("rejected"); - } + var actor2 = ActorOf(() => new PersistActor(_probe)); + await WatchAsync(actor2); - [Fact] - public async Task journal_must_reset_state_to_pass() - { - await WithJournalWrite(write => write.Fail(), async () => - { - var actor = ActorRegistry.Get(); - Watch(actor); - await probe!.ExpectMsgAsync(); - - actor.Tell(new PersistActor.WriteMessage("write"), TestActor); - await probe.ExpectMsgAsync("failure"); - await ExpectTerminatedAsync(actor); - }); - - var actor2 = ActorOf(() => new PersistActor(probe!)); - Watch(actor2); - - await probe!.ExpectMsgAsync(); - actor2.Tell(new PersistActor.WriteMessage("write"), TestActor); - await probe.ExpectMsgAsync("ack"); - } + await _probe.ExpectMsgAsync(); + actor2.Tell(new PersistActor.WriteMessage("write"), TestActor); + await _probe.ExpectMsgAsync("ack"); } -} +} \ No newline at end of file diff --git a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs index 78b387a0..7d206a88 100644 --- a/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs +++ b/src/Akka.Hosting.TestKit.Tests/TestPersistenceTestKistTests/TestSnapshotStoreSpec.cs @@ -7,110 +7,104 @@ using Xunit.Abstractions; -namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests +namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests; + +using Actor; +using Akka.Persistence; +using Akka.Persistence.TestKit; +using Akka.Persistence.TestKit.Tests; +using Akka.TestKit; +using System; +using System.Threading.Tasks; +using Xunit; + +public sealed class TestSnapshotStoreSpec : PersistenceTestKit { - using Actor; - using Akka.Persistence; - using Akka.Persistence.TestKit; - using Akka.Persistence.TestKit.Tests; - using Akka.TestKit; - using System; - using System.Threading.Tasks; - using Xunit; - - public sealed class TestSnapshotStoreSpec : PersistenceTestKit + public TestSnapshotStoreSpec(ITestOutputHelper output) : base(nameof(TestSnapshotStoreSpec), output: output) { - public TestSnapshotStoreSpec(ITestOutputHelper output) : base(nameof(TestSnapshotStoreSpec), output: output) - { - } + } - protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) - { - base.ConfigureAkka(builder, provider); - builder.WithActors((system, registry) => - { - probe = CreateTestProbe(); - var echo = system.ActorOf(Props.Create(() => new SnapshotActor(probe))); - registry.Register(echo); - }); - } - - private TestProbe? probe; - - [Fact] - public async Task send_ack_after_load_interceptor_is_set() - { - SnapshotsActorRef!.Tell(new TestSnapshotStore.UseLoadInterceptor(null), TestActor); - await ExpectMsgAsync(); - } + protected override Task BeforeTestStart() + { + _probe = CreateTestProbe(); + return Task.CompletedTask; + } - [Fact] - public async Task send_ack_after_save_interceptor_is_set() - { - SnapshotsActorRef!.Tell(new TestSnapshotStore.UseSaveInterceptor(null), TestActor); - await ExpectMsgAsync(); - } + private TestProbe _probe = null!; - [Fact] - public async Task send_ack_after_delete_interceptor_is_set() - { - SnapshotsActorRef!.Tell(new TestSnapshotStore.UseDeleteInterceptor(null), TestActor); - await ExpectMsgAsync(); - } + [Fact] + public async Task send_ack_after_load_interceptor_is_set() + { + SnapshotsActorRef.Tell(new TestSnapshotStore.UseLoadInterceptor(null), TestActor); + await ExpectMsgAsync(); + } - [Fact] - public async Task after_load_behavior_was_executed_store_is_back_to_pass_mode() - { - // create snapshot - var actor = ActorRegistry.Get(); - actor.Tell("save"); - await probe!.ExpectMsgAsync(); - await actor.GracefulStop(TimeSpan.FromSeconds(3)); + [Fact] + public async Task send_ack_after_save_interceptor_is_set() + { + SnapshotsActorRef.Tell(new TestSnapshotStore.UseSaveInterceptor(null), TestActor); + await ExpectMsgAsync(); + } - await WithSnapshotLoad(load => load.Fail(), async () => - { - ActorOf(() => new SnapshotActor(probe!)); - await probe!.ExpectMsgAsync(); - }); + [Fact] + public async Task send_ack_after_delete_interceptor_is_set() + { + SnapshotsActorRef.Tell(new TestSnapshotStore.UseDeleteInterceptor(null), TestActor); + await ExpectMsgAsync(); + } - ActorOf(() => new SnapshotActor(probe!)); - await probe!.ExpectMsgAsync(); - } + [Fact] + public async Task after_load_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorOf(() => new SnapshotActor(_probe)); + actor.Tell("save"); + await _probe.ExpectMsgAsync(); + await actor.GracefulStop(TimeSpan.FromSeconds(3)); - [Fact] - public async Task after_save_behavior_was_executed_store_is_back_to_pass_mode() + await WithSnapshotLoad(load => load.Fail(), async () => { - // create snapshot - var actor = ActorRegistry.Get(); + ActorOf(() => new SnapshotActor(_probe)); + await _probe.ExpectMsgAsync(); + }); - await WithSnapshotSave(save => save.Fail(), async () => - { - actor.Tell("save"); - await probe!.ExpectMsgAsync(); - }); + ActorOf(() => new SnapshotActor(_probe)); + await _probe.ExpectMsgAsync(); + } - actor.Tell("save"); - await probe!.ExpectMsgAsync(); - } + [Fact] + public async Task after_save_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorOf(() => new SnapshotActor(_probe)); - [Fact] - public async Task after_delete_behavior_was_executed_store_is_back_to_pass_mode() + await WithSnapshotSave(save => save.Fail(), async () => { - // create snapshot - var actor = ActorRegistry.Get(); actor.Tell("save"); + await _probe.ExpectMsgAsync(); + }); + + actor.Tell("save"); + await _probe.ExpectMsgAsync(); + } - var success = await probe!.ExpectMsgAsync(); - var nr = success.Metadata.SequenceNr; + [Fact] + public async Task after_delete_behavior_was_executed_store_is_back_to_pass_mode() + { + // create snapshot + var actor = ActorOf(() => new SnapshotActor(_probe)); + actor.Tell("save"); - await WithSnapshotDelete(del => del.Fail(), async () => - { - actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor); - await probe!.ExpectMsgAsync(); - }); + var success = await _probe.ExpectMsgAsync(); + var nr = success.Metadata.SequenceNr; + await WithSnapshotDelete(del => del.Fail(), async () => + { actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor); - await probe!.ExpectMsgAsync(); - } + await _probe.ExpectMsgAsync(); + }); + + actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor); + await _probe.ExpectMsgAsync(); } -} +} \ No newline at end of file diff --git a/src/Akka.Hosting.TestKit/PersistenceTestKit.cs b/src/Akka.Hosting.TestKit/PersistenceTestKit.cs index 8a00f897..eddbb070 100644 --- a/src/Akka.Hosting.TestKit/PersistenceTestKit.cs +++ b/src/Akka.Hosting.TestKit/PersistenceTestKit.cs @@ -9,298 +9,271 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.TestKit; +using Microsoft.Extensions.Logging; using Xunit.Abstractions; -namespace Akka.Hosting.TestKit +namespace Akka.Hosting.TestKit; + +public abstract class PersistenceTestKit : TestKit { - public abstract class PersistenceTestKit : TestKit + public static readonly Config DefaultConfiguration = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); + + /// + /// Create a new instance of the class. + /// A new system with the specified configuration will be created. + /// + public PersistenceTestKit(string? actorSystemName = null, ITestOutputHelper? output = null, TimeSpan? startupTimeout = null, LogLevel logLevel = LogLevel.Information) + : base(actorSystemName, output, startupTimeout, logLevel) { - /// - /// Create a new instance of the class. - /// A new system with the specified configuration will be created. - /// - /// Test ActorSystem configuration - /// Optional: The name of the actor system - /// TBD - public PersistenceTestKit(Config? config = null, string? actorSystemName = null, ITestOutputHelper? output = null) - : base(actorSystemName, output) - { - _config = GetConfig(config ?? Config.Empty); - } + } - /// - /// Actor reference to persistence Journal used by current actor system. - /// - public IActorRef? JournalActorRef { get; set; } + /// + /// Actor reference to persistence Journal used by current actor system. + /// + public IActorRef JournalActorRef { get; private set; } = null!; - /// - /// Actor reference to persistence Snapshot Store used by current actor system. - /// - public IActorRef? SnapshotsActorRef { get; set; } + /// + /// Actor reference to persistence Snapshot Store used by current actor system. + /// + public IActorRef SnapshotsActorRef { get; private set; } = null!; - /// - /// - /// - public ITestJournal? Journal { get; set; } + /// + /// Current journal IActorRef wrapped inside a TestJournal + /// + public ITestJournal Journal { get; private set; } = null!; - /// - /// - /// - public ITestSnapshotStore? Snapshots { get; set; } + /// + /// Current snapshot store IActorRef wrapped inside a TestSnapshotStore + /// + public ITestSnapshotStore Snapshots { get; private set; } = null!; - /// - /// Execute delegate with Journal Behavior applied to Recovery operation. - /// - /// - /// After will be executed, Recovery behavior will be reverted back to normal. - /// - /// Delegate which will select Journal behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public async Task WithJournalRecovery(Func behaviorSelector, Func execution) - { - if (Journal == null) throw new ArgumentNullException(nameof(Journal)); - if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); - if (execution == null) throw new ArgumentNullException(nameof(execution)); + /// + /// Execute delegate with Journal Behavior applied to Recovery operation. + /// + /// + /// After will be executed, Recovery behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithJournalRecovery(Func behaviorSelector, Func execution) + { + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); - try - { - await behaviorSelector(Journal.OnRecovery); - await execution(); - } - finally - { - await Journal.OnRecovery.Pass(); - } + try + { + await behaviorSelector(Journal.OnRecovery); + await execution(); } - - /// - /// Execute delegate with Journal Behavior applied to Write operation. - /// - /// - /// After will be executed, Write behavior will be reverted back to normal. - /// - /// Delegate which will select Journal behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public async Task WithJournalWrite(Func behaviorSelector, Func execution) + finally { - if (Journal == null) throw new ArgumentNullException(nameof(Journal)); - if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); - if (execution == null) throw new ArgumentNullException(nameof(execution)); - - try - { - await behaviorSelector(Journal.OnWrite); - await execution(); - } - finally - { - await Journal.OnWrite.Pass(); - } + await Journal.OnRecovery.Pass(); } + } - /// - /// Execute delegate with Journal Behavior applied to Recovery operation. - /// - /// - /// After will be executed, Recovery behavior will be reverted back to normal. - /// - /// Delegate which will select Journal behavior. - /// Delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public Task WithJournalRecovery(Func behaviorSelector, Action execution) - => WithJournalRecovery(behaviorSelector, () => - { - if (execution == null) throw new ArgumentNullException(nameof(execution)); - - execution(); - return Task.FromResult(new object()); - }); - - /// - /// Execute delegate with Journal Behavior applied to Write operation. - /// - /// - /// After will be executed, Write behavior will be reverted back to normal. - /// - /// Delegate which will select Journal behavior. - /// Delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public Task WithJournalWrite(Func behaviorSelector, Action execution) - => WithJournalWrite(behaviorSelector, () => - { - if (execution == null) throw new ArgumentNullException(nameof(execution)); - - execution(); - return Task.FromResult(new object()); - }); + /// + /// Execute delegate with Journal Behavior applied to Write operation. + /// + /// + /// After will be executed, Write behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithJournalWrite(Func behaviorSelector, Func execution) + { + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); - /// - /// Execute delegate with Snapshot Store Behavior applied to Save operation. - /// - /// - /// After will be executed, Save behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public async Task WithSnapshotSave(Func behaviorSelector, Func execution) + try { - if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); - if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); - if (execution == null) throw new ArgumentNullException(nameof(execution)); - - try - { - await behaviorSelector(Snapshots.OnSave); - await execution(); - } - finally - { - await Snapshots.OnSave.Pass(); - } + await behaviorSelector(Journal.OnWrite); + await execution(); + } + finally + { + await Journal.OnWrite.Pass(); } + } - /// - /// Execute delegate with Snapshot Store Behavior applied to Load operation. - /// - /// - /// After will be executed, Load behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public async Task WithSnapshotLoad(Func behaviorSelector, Func execution) + /// + /// Execute delegate with Journal Behavior applied to Recovery operation. + /// + /// + /// After will be executed, Recovery behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithJournalRecovery(Func behaviorSelector, Action execution) + => WithJournalRecovery(behaviorSelector, () => { - if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); - if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); if (execution == null) throw new ArgumentNullException(nameof(execution)); - try - { - await behaviorSelector(Snapshots.OnLoad); - await execution(); - } - finally - { - await Snapshots.OnLoad.Pass(); - } - } + execution(); + return Task.FromResult(new object()); + }); - /// - /// Execute delegate with Snapshot Store Behavior applied to Delete operation. - /// - /// - /// After will be executed, Delete behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public async Task WithSnapshotDelete(Func behaviorSelector, Func execution) + /// + /// Execute delegate with Journal Behavior applied to Write operation. + /// + /// + /// After will be executed, Write behavior will be reverted back to normal. + /// + /// Delegate which will select Journal behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithJournalWrite(Func behaviorSelector, Action execution) + => WithJournalWrite(behaviorSelector, () => { - if (Snapshots == null) throw new ArgumentNullException(nameof(Journal)); - if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); if (execution == null) throw new ArgumentNullException(nameof(execution)); - try - { - await behaviorSelector(Snapshots.OnDelete); - await execution(); - } - finally - { - await Snapshots.OnDelete.Pass(); - } + execution(); + return Task.FromResult(new object()); + }); + + /// + /// Execute delegate with Snapshot Store Behavior applied to Save operation. + /// + /// + /// After will be executed, Save behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotSave(Func behaviorSelector, Func execution) + { + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); + + try + { + await behaviorSelector(Snapshots.OnSave); + await execution(); } + finally + { + await Snapshots.OnSave.Pass(); + } + } - /// - /// Execute delegate with Snapshot Store Behavior applied to Save operation. - /// - /// - /// After will be executed, Save behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public Task WithSnapshotSave(Func behaviorSelector, Action execution) - => WithSnapshotSave(behaviorSelector, () => - { - if (execution == null) throw new ArgumentNullException(nameof(execution)); + /// + /// Execute delegate with Snapshot Store Behavior applied to Load operation. + /// + /// + /// After will be executed, Load behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotLoad(Func behaviorSelector, Func execution) + { + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); - execution(); - return Task.FromResult(true); - }); + try + { + await behaviorSelector(Snapshots.OnLoad); + await execution(); + } + finally + { + await Snapshots.OnLoad.Pass(); + } + } + + /// + /// Execute delegate with Snapshot Store Behavior applied to Delete operation. + /// + /// + /// After will be executed, Delete behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public async Task WithSnapshotDelete(Func behaviorSelector, Func execution) + { + if (behaviorSelector == null) throw new ArgumentNullException(nameof(behaviorSelector)); + if (execution == null) throw new ArgumentNullException(nameof(execution)); - /// - /// Execute delegate with Snapshot Store Behavior applied to Load operation. - /// - /// - /// After will be executed, Load behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public Task WithSnapshotLoad(Func behaviorSelector, Action execution) - => WithSnapshotLoad(behaviorSelector, () => - { - if (execution == null) throw new ArgumentNullException(nameof(execution)); + try + { + await behaviorSelector(Snapshots.OnDelete); + await execution(); + } + finally + { + await Snapshots.OnDelete.Pass(); + } + } - execution(); - return Task.FromResult(true); - }); + /// + /// Execute delegate with Snapshot Store Behavior applied to Save operation. + /// + /// + /// After will be executed, Save behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotSave(Func behaviorSelector, Action execution) + => WithSnapshotSave(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); - /// - /// Execute delegate with Snapshot Store Behavior applied to Delete operation. - /// - /// - /// After will be executed, Delete behavior will be reverted back to normal. - /// - /// Delegate which will select Snapshot Store behavior. - /// Async delegate which will be executed with applied Journal behavior. - /// which must be awaited. - public Task WithSnapshotDelete(Func behaviorSelector, Action execution) - => WithSnapshotDelete(behaviorSelector, () => - { - if (execution == null) throw new ArgumentNullException(nameof(execution)); + execution(); + return Task.FromResult(true); + }); - execution(); - return Task.FromResult(true); - }); + /// + /// Execute delegate with Snapshot Store Behavior applied to Load operation. + /// + /// + /// After will be executed, Load behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotLoad(Func behaviorSelector, Action execution) + => WithSnapshotLoad(behaviorSelector, () => + { + if (execution == null) throw new ArgumentNullException(nameof(execution)); - private readonly Config _config; - protected override Config? Config => _config; + execution(); + return Task.FromResult(true); + }); - protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + /// + /// Execute delegate with Snapshot Store Behavior applied to Delete operation. + /// + /// + /// After will be executed, Delete behavior will be reverted back to normal. + /// + /// Delegate which will select Snapshot Store behavior. + /// Async delegate which will be executed with applied Journal behavior. + /// which must be awaited. + public Task WithSnapshotDelete(Func behaviorSelector, Action execution) + => WithSnapshotDelete(behaviorSelector, () => { - if (_config is not null) - { - builder.AddHocon(_config, HoconAddMode.Append); + if (execution == null) throw new ArgumentNullException(nameof(execution)); - builder.AddStartup((system, registry) => - { - var persistenceExtension = Persistence.Persistence.Instance.Apply(system); - JournalActorRef = persistenceExtension.JournalFor(null); - Journal = TestJournal.FromRef(JournalActorRef); - SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null); - Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef); - }); - } - } + execution(); + return Task.FromResult(true); + }); + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder.AddHocon(DefaultConfiguration, HoconAddMode.Append); - /// - /// Loads from embedded resources actor system persistence configuration with and - /// configured as default persistence plugins. - /// - /// Custom configuration that was passed in the constructor. - /// Actor system configuration object. - /// - private static Config GetConfig(Config customConfig) + builder.AddStartup((system, registry) => { - var defaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.TestKit.config.conf"); - if (customConfig == Config.Empty) return defaultConfig; - else return defaultConfig.SafeWithFallback(customConfig); - } + var persistenceExtension = Persistence.Persistence.Instance.Apply(system); + + JournalActorRef = persistenceExtension.JournalFor(null); + Journal = TestJournal.FromRef(JournalActorRef); + SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null); + Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef); + }); } -} - +} \ No newline at end of file From dadd0d95f77719abb77e76f608f292785c81306b Mon Sep 17 00:00:00 2001 From: Bert Hertogen Date: Sat, 21 Jun 2025 21:16:28 +0200 Subject: [PATCH 3/4] Add PersistenceTestKit class for enhanced testing to the Akka.Hosting.API.Tests --- .../CoreApiSpec.ApproveTestKit.verified.txt | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt index 98065745..b9f1a186 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt @@ -22,6 +22,26 @@ namespace Akka.Hosting.TestKit.Internals } namespace Akka.Hosting.TestKit { + public abstract class PersistenceTestKit : Akka.Hosting.TestKit.TestKit + { + public PersistenceTestKit(Akka.Configuration.Config? config = null, string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null) { } + protected override Akka.Configuration.Config? Config { get; } + public Akka.Persistence.TestKit.ITestJournal? Journal { get; set; } + public Akka.Actor.IActorRef? JournalActorRef { get; set; } + public Akka.Persistence.TestKit.ITestSnapshotStore? Snapshots { get; set; } + public Akka.Actor.IActorRef? SnapshotsActorRef { get; set; } + protected override void ConfigureAkka(Akka.Hosting.AkkaConfigurationBuilder builder, System.IServiceProvider provider) { } + public System.Threading.Tasks.Task WithJournalRecovery(System.Func behaviorSelector, System.Action execution) { } + public System.Threading.Tasks.Task WithJournalRecovery(System.Func behaviorSelector, System.Func execution) { } + public System.Threading.Tasks.Task WithJournalWrite(System.Func behaviorSelector, System.Action execution) { } + public System.Threading.Tasks.Task WithJournalWrite(System.Func behaviorSelector, System.Func execution) { } + public System.Threading.Tasks.Task WithSnapshotDelete(System.Func behaviorSelector, System.Action execution) { } + public System.Threading.Tasks.Task WithSnapshotDelete(System.Func behaviorSelector, System.Func execution) { } + public System.Threading.Tasks.Task WithSnapshotLoad(System.Func behaviorSelector, System.Action execution) { } + public System.Threading.Tasks.Task WithSnapshotLoad(System.Func behaviorSelector, System.Func execution) { } + public System.Threading.Tasks.Task WithSnapshotSave(System.Func behaviorSelector, System.Action execution) { } + public System.Threading.Tasks.Task WithSnapshotSave(System.Func behaviorSelector, System.Func execution) { } + } public abstract class TestKit : Akka.TestKit.TestKitBase, Xunit.IAsyncLifetime { protected TestKit(string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null, System.TimeSpan? startupTimeout = default, Microsoft.Extensions.Logging.LogLevel logLevel = 2) { } From 6bb6d941b94a20aa767cc47ff5170d5717c15233 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 24 Jun 2025 01:02:34 +0700 Subject: [PATCH 4/4] Update API Approval list --- .../verify/CoreApiSpec.ApproveTestKit.verified.txt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt index b9f1a186..ebcb9f36 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt @@ -24,12 +24,12 @@ namespace Akka.Hosting.TestKit { public abstract class PersistenceTestKit : Akka.Hosting.TestKit.TestKit { - public PersistenceTestKit(Akka.Configuration.Config? config = null, string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null) { } - protected override Akka.Configuration.Config? Config { get; } - public Akka.Persistence.TestKit.ITestJournal? Journal { get; set; } - public Akka.Actor.IActorRef? JournalActorRef { get; set; } - public Akka.Persistence.TestKit.ITestSnapshotStore? Snapshots { get; set; } - public Akka.Actor.IActorRef? SnapshotsActorRef { get; set; } + public static readonly Akka.Configuration.Config DefaultConfiguration; + public PersistenceTestKit(string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null, System.TimeSpan? startupTimeout = default, Microsoft.Extensions.Logging.LogLevel logLevel = 2) { } + public Akka.Persistence.TestKit.ITestJournal Journal { get; } + public Akka.Actor.IActorRef JournalActorRef { get; } + public Akka.Persistence.TestKit.ITestSnapshotStore Snapshots { get; } + public Akka.Actor.IActorRef SnapshotsActorRef { get; } protected override void ConfigureAkka(Akka.Hosting.AkkaConfigurationBuilder builder, System.IServiceProvider provider) { } public System.Threading.Tasks.Task WithJournalRecovery(System.Func behaviorSelector, System.Action execution) { } public System.Threading.Tasks.Task WithJournalRecovery(System.Func behaviorSelector, System.Func execution) { }