Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ namespace Akka.Persistence
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
{
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
public System.Threading.CancellationToken CancellationToken { get; }
public override string ToString() { }
}
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
{
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
public System.Threading.CancellationToken CancellationToken { get; }
public override string ToString() { }
}
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
{
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
Expand Down Expand Up @@ -319,6 +331,12 @@ namespace Akka.Persistence
{
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
}
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
{
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
public override string ToString() { }
}
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
{
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
Expand Down Expand Up @@ -378,12 +396,37 @@ namespace Akka.Persistence
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
public Akka.Persistence.PersistenceSettings Settings { get; }
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
[Akka.Annotations.InternalStableApiAttribute()]
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
public string PersistenceId(Akka.Actor.IActorRef actor) { }
[Akka.Annotations.InternalStableApiAttribute()]
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
}
[System.Runtime.CompilerServices.IsReadOnlyAttribute()]
[System.Runtime.CompilerServices.NullableAttribute(0)]
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
{
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
2,
0,
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
2,
0,
0})]
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
public string Description { get; set; }
public System.Exception Exception { get; set; }
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
}
public enum PersistenceHealthStatus
{
Healthy = 0,
Degraded = 1,
Unhealthy = 2,
}
public sealed class PersistenceSettings : Akka.Actor.Settings
{
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
Expand Down Expand Up @@ -645,6 +688,12 @@ namespace Akka.Persistence
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
{
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
public override string ToString() { }
}
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
{
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
Expand Down Expand Up @@ -877,6 +926,7 @@ namespace Akka.Persistence.Journal
{
protected readonly bool CanPublish;
protected AsyncWriteJournal() { }
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
protected virtual bool Receive(object message) { }
Expand Down Expand Up @@ -1235,6 +1285,7 @@ namespace Akka.Persistence.Snapshot
public abstract class SnapshotStore : Akka.Actor.ActorBase
{
protected SnapshotStore() { }
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ namespace Akka.Persistence
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
{
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
public System.Threading.CancellationToken CancellationToken { get; }
public override string ToString() { }
}
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
{
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
public System.Threading.CancellationToken CancellationToken { get; }
public override string ToString() { }
}
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
{
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
Expand Down Expand Up @@ -319,6 +331,12 @@ namespace Akka.Persistence
{
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
}
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
{
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
public override string ToString() { }
}
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
{
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
Expand Down Expand Up @@ -378,12 +396,36 @@ namespace Akka.Persistence
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
public Akka.Persistence.PersistenceSettings Settings { get; }
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
[Akka.Annotations.InternalStableApiAttribute()]
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
public string PersistenceId(Akka.Actor.IActorRef actor) { }
[Akka.Annotations.InternalStableApiAttribute()]
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
}
[System.Runtime.CompilerServices.NullableAttribute(0)]
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
{
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
2,
0,
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
2,
0,
0})]
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
public string Description { get; set; }
public System.Exception Exception { get; set; }
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
}
public enum PersistenceHealthStatus
{
Healthy = 0,
Degraded = 1,
Unhealthy = 2,
}
public sealed class PersistenceSettings : Akka.Actor.Settings
{
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
Expand Down Expand Up @@ -645,6 +687,12 @@ namespace Akka.Persistence
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
{
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
public override string ToString() { }
}
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
{
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
Expand Down Expand Up @@ -877,6 +925,7 @@ namespace Akka.Persistence.Journal
{
protected readonly bool CanPublish;
protected AsyncWriteJournal() { }
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
protected virtual bool Receive(object message) { }
Expand Down Expand Up @@ -1233,6 +1282,7 @@ namespace Akka.Persistence.Snapshot
public abstract class SnapshotStore : Akka.Actor.ActorBase
{
protected SnapshotStore() { }
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
Expand Down
132 changes: 132 additions & 0 deletions src/core/Akka.Persistence.Tests/JournalHealthCheckSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceHealthCheckSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.TestKit;
using Akka.TestKit.Configs;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Tests;

public class JournalHealthCheckSpec : PersistenceSpec
{
private static Config HealthCheckConfig()
{
const string extraConfig = """

akka.persistence.journal.failing-open {
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
circuit-breaker {
max-failures = 1
call-timeout = 1s
reset-timeout = 10s
}
}
akka.persistence.journal.failing-half-open {
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
circuit-breaker {
max-failures = 1
call-timeout = 1s
reset-timeout = 1s
}
}
# Disable message serialization for circuit breaker tests to avoid serialization issues
akka.actor.serialize-messages = off

""";
return TestConfigs.TestSchedulerConfig
.WithFallback(Configuration("PersistenceHealthCheckSpec", extraConfig: extraConfig));
}

public JournalHealthCheckSpec(ITestOutputHelper output) : base(HealthCheckConfig(), output)
{
}

[Theory]
[InlineData(null)] // default plugin
[InlineData("akka.persistence.journal.inmem")]
public async Task JournalHealthCheck_should_default_to_Healthy(string? pluginId)
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync(pluginId, cts.Token);

Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status);
Assert.NotNull(pluginHealth.Description);
}

[Fact]
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_Open()
{
// Get the journal actor reference
var journal = Extension.JournalFor("akka.persistence.journal.failing-open");

// Trigger a failure to open the circuit breaker
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
TestActor, 1);
journal.Tell(writeMsg, TestActor);

// Advance time to let the write fail and circuit breaker open
var testScheduler = (TestScheduler)Sys.Scheduler;
testScheduler.Advance(TimeSpan.FromSeconds(2));

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
}

[Fact]
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_HalfOpen()
{
// Get the journal actor reference
var journal = Extension.JournalFor("akka.persistence.journal.failing-half-open");

// Trigger a failure to open the circuit breaker
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
TestActor, 1);
journal.Tell(writeMsg, TestActor);

var testScheduler = (TestScheduler)Sys.Scheduler;

// Advance time past call-timeout to let the write fail and circuit breaker open
testScheduler.Advance(TimeSpan.FromSeconds(1));

// Give the async operations time to complete
await Task.Delay(100);

// Advance time past reset-timeout to transition to half-open
testScheduler.Advance(TimeSpan.FromSeconds(1));

// Give the transition time to complete
await Task.Delay(100);

using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
}
}

/// <summary>
/// Test journal that always fails writes to trigger circuit breaker
/// </summary>
public class FailingJournal : MemoryJournal
{
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
{
throw new InvalidOperationException("Simulated journal write failure");
}
}
Loading
Loading