Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ namespace Akka.Hosting.TestKit.Internals
}
namespace Akka.Hosting.TestKit
{
public abstract class PersistenceTestKit : Akka.Hosting.TestKit.TestKit
{
public static readonly Akka.Configuration.Config DefaultConfiguration;
public PersistenceTestKit(string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null, System.TimeSpan? startupTimeout = default, Microsoft.Extensions.Logging.LogLevel logLevel = 2) { }
public Akka.Persistence.TestKit.ITestJournal Journal { get; }
public Akka.Actor.IActorRef JournalActorRef { get; }
public Akka.Persistence.TestKit.ITestSnapshotStore Snapshots { get; }
public Akka.Actor.IActorRef SnapshotsActorRef { get; }
protected override void ConfigureAkka(Akka.Hosting.AkkaConfigurationBuilder builder, System.IServiceProvider provider) { }
public System.Threading.Tasks.Task WithJournalRecovery(System.Func<Akka.Persistence.TestKit.JournalRecoveryBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Action execution) { }
public System.Threading.Tasks.Task WithJournalRecovery(System.Func<Akka.Persistence.TestKit.JournalRecoveryBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Func<System.Threading.Tasks.Task> execution) { }
public System.Threading.Tasks.Task WithJournalWrite(System.Func<Akka.Persistence.TestKit.JournalWriteBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Action execution) { }
public System.Threading.Tasks.Task WithJournalWrite(System.Func<Akka.Persistence.TestKit.JournalWriteBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Func<System.Threading.Tasks.Task> execution) { }
public System.Threading.Tasks.Task WithSnapshotDelete(System.Func<Akka.Persistence.TestKit.SnapshotStoreDeleteBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Action execution) { }
public System.Threading.Tasks.Task WithSnapshotDelete(System.Func<Akka.Persistence.TestKit.SnapshotStoreDeleteBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Func<System.Threading.Tasks.Task> execution) { }
public System.Threading.Tasks.Task WithSnapshotLoad(System.Func<Akka.Persistence.TestKit.SnapshotStoreLoadBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Action execution) { }
public System.Threading.Tasks.Task WithSnapshotLoad(System.Func<Akka.Persistence.TestKit.SnapshotStoreLoadBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Func<System.Threading.Tasks.Task> execution) { }
public System.Threading.Tasks.Task WithSnapshotSave(System.Func<Akka.Persistence.TestKit.SnapshotStoreSaveBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Action execution) { }
public System.Threading.Tasks.Task WithSnapshotSave(System.Func<Akka.Persistence.TestKit.SnapshotStoreSaveBehavior, System.Threading.Tasks.Task> behaviorSelector, System.Func<System.Threading.Tasks.Task> execution) { }
}
public abstract class TestKit : Akka.TestKit.TestKitBase, Xunit.IAsyncLifetime
{
protected TestKit(string? actorSystemName = null, Xunit.Abstractions.ITestOutputHelper? output = null, System.TimeSpan? startupTimeout = default, Microsoft.Extensions.Logging.LogLevel logLevel = 2) { }
Expand Down
72 changes: 72 additions & 0 deletions src/Akka.Hosting.TestKit.Tests/TestActorRefTests/PersistActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//-----------------------------------------------------------------------
// <copyright file="PersistActor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Persistence;

namespace Akka.Hosting.TestKit.Tests.TestActorRefTests
{
using System;
using Actor;

public class PersistActor : UntypedPersistentActor
{
public PersistActor(IActorRef probe)
{
_probe = probe;
}

private readonly IActorRef _probe;

public override string PersistenceId => "foo";

protected override void OnCommand(object message)
{
switch (message)
{
case WriteMessage msg:
Persist(msg.Data, _ =>
{
_probe.Tell("ack");
});

break;

default:
return;
}
}

protected override void OnRecover(object message)
{
_probe.Tell(message);
}

protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr)
{
_probe.Tell("failure");

base.OnPersistFailure(cause, @event, sequenceNr);
}

protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
{
_probe.Tell("rejected");

base.OnPersistRejected(cause, @event, sequenceNr);
}

public class WriteMessage
{
public string Data { get; }

public WriteMessage(string data)
{
Data = data;
}
}
}
}
100 changes: 100 additions & 0 deletions src/Akka.Hosting.TestKit.Tests/TestActorRefTests/SnapshotActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//-----------------------------------------------------------------------
// <copyright file="SnapshotActor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

namespace Akka.Persistence.TestKit.Tests
{
using System;
using Actor;

public class SnapshotActor : UntypedPersistentActor
{
public SnapshotActor(IActorRef probe)
{
_probe = probe;
}

private readonly IActorRef _probe;

public override string PersistenceId => "bar";

protected override void OnCommand(object message)
{
switch (message)
{
case "save":
SaveSnapshot(message);
return;

case DeleteOne del:
DeleteSnapshot(del.SequenceNr);
return;

case DeleteMany del:
DeleteSnapshots(del.Criteria);
return;

case SaveSnapshotSuccess _:
case SaveSnapshotFailure _:
case DeleteSnapshotSuccess _:
case DeleteSnapshotFailure _:
case DeleteSnapshotsSuccess _:
case DeleteSnapshotsFailure _:
_probe.Tell(message);
return;

default:
return;
}
}

protected override void OnRecover(object message)
{
if (message is SnapshotOffer snapshot)
{
_probe.Tell(message);
}
}

protected override void OnRecoveryFailure(Exception reason, object message)
{
_probe.Tell(new RecoveryFailure(reason, message));
base.OnRecoveryFailure(reason, message);
}

public class DeleteOne
{
public DeleteOne(long sequenceNr)
{
SequenceNr = sequenceNr;
}

public long SequenceNr { get; }
}

public class DeleteMany
{
public DeleteMany(SnapshotSelectionCriteria criteria)
{
Criteria = criteria;
}

public SnapshotSelectionCriteria Criteria { get; }
}

public class RecoveryFailure
{
public RecoveryFailure(Exception reason, object message)
{
Reason = reason;
Message = message;
}

public Exception Reason { get; }
public object Message { get; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using Akka.Actor;
using Akka.Configuration;
using Akka.Hosting.TestKit.Tests.TestActorRefTests;
using Akka.Persistence;
using Akka.Persistence.TestKit;
using Akka.TestKit;
using FluentAssertions;
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Hosting.TestKit.Tests.TestPersistenceTestKistTests;

public class TestJournalSpec : PersistenceTestKit
{
private TestProbe _probe = null!;

public TestJournalSpec(ITestOutputHelper output) : base(nameof(TestJournalSpec), output)
{
}

// Expect should be passing by default, need to make them less sensitive to timing
protected override Config? Config => "akka.test.single-expect-default = 30s";

protected override Task BeforeTestStart()
{
_probe = CreateTestProbe();
return Task.CompletedTask;
}

[Fact]
public void must_have_journal_and_snapshot()
{
Journal.Should().NotBeNull();
JournalActorRef.Should().NotBeNull();
Snapshots.Should().NotBeNull();
SnapshotsActorRef.Should().NotBeNull();
}

[Fact]
public async Task must_return_ack_after_new_write_interceptor_is_set()
{
JournalActorRef.Tell(new TestJournal.UseWriteInterceptor(null), TestActor);

await ExpectMsgAsync<TestJournal.Ack>(TimeSpan.FromSeconds(3));
}

[Fact]
public async Task works_as_memory_journal_by_default()
{
var actor = ActorOf(() => new PersistActor(_probe));
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Pass();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

await _probe.ExpectMsgAsync("ack");
}

[Fact]
public async Task must_recover_restarted_actor()
{
var actor = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor);
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnRecovery.Pass();
actor.Tell(new PersistActor.WriteMessage("1"), TestActor);
await _probe.ExpectMsgAsync("ack");
actor.Tell(new PersistActor.WriteMessage("2"), TestActor);
await _probe.ExpectMsgAsync("ack");

await actor.GracefulStop(TimeSpan.FromSeconds(1));
await ExpectTerminatedAsync(actor);

ActorOf(() => new PersistActor(_probe));
await _probe.ExpectMsgAsync("1");
await _probe.ExpectMsgAsync("2");
await _probe.ExpectMsgAsync<RecoveryCompleted>();
}

[Fact]
public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail()
{
var actor = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor);
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Fail();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);
}

[Fact]
public async Task must_recover_failed_actor()
{
var actor = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor);
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnRecovery.Pass();
actor.Tell(new PersistActor.WriteMessage("1"), TestActor);
await _probe.ExpectMsgAsync("ack");
actor.Tell(new PersistActor.WriteMessage("2"), TestActor);
await _probe.ExpectMsgAsync("ack");

await Journal.OnWrite.Fail();
actor.Tell(new PersistActor.WriteMessage("3"), TestActor);

await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);

ActorOf(() => new PersistActor(_probe));
await _probe.ExpectMsgAsync("1");
await _probe.ExpectMsgAsync("2");
await _probe.ExpectMsgAsync<RecoveryCompleted>();
}

[Fact]
public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected()
{
var actor = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor);
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Reject();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

await _probe.ExpectMsgAsync("rejected");
}

[Fact]
public async Task journal_must_reset_state_to_pass()
{
await WithJournalWrite(write => write.Fail(), async () =>
{
var actor = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor);
await _probe.ExpectMsgAsync<RecoveryCompleted>();

actor.Tell(new PersistActor.WriteMessage("write"), TestActor);
await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);
});

var actor2 = ActorOf(() => new PersistActor(_probe));
await WatchAsync(actor2);

await _probe.ExpectMsgAsync<RecoveryCompleted>();
actor2.Tell(new PersistActor.WriteMessage("write"), TestActor);
await _probe.ExpectMsgAsync("ack");
}
}
Loading