Skip to content

Commit f75c1f4

Browse files
Fix event adapter callback API not invoking adapters at runtime (#674)
* Fix event adapter callback API not invoking adapters at runtime Fixed a critical bug where event adapters configured via the NEW callback API (journalBuilder callback parameter in WithJournal/WithJournalAndSnapshot) were not being invoked at runtime, despite being correctly present in HOCON configuration. Root Cause: AkkaPersistenceJournalBuilder.Build() was creating a Config with .WithFallback(Persistence.DefaultConfig()) which interfered with the main journal configuration during HOCON merging, causing adapters to be registered but not invoked. The Fix: Removed the unnecessary .WithFallback(Persistence.DefaultConfig()) call in AkkaPersistenceHostingExtensions.cs line 130. The adapter configuration blocks (event-adapters and event-adapter-bindings) don't require fallback to default persistence config and merge correctly with the journal config without it. Impact: This bug affected users with: - Multiple journal configurations (e.g., default + sharding) - NEW callback API: journalBuilder: journal => journal.AddWriteEventAdapter<...>() - Explicit JournalOptions + WithJournalAndSnapshot pattern This fix is especially critical now that the old Adapters property has been deprecated in #665. All users must use the callback API which was broken. Testing: - Added comprehensive regression test suite (EventAdapterRuntimeInvocationSpecs.cs) - Tests verify adapter runtime invocation with various configuration patterns - All 20 Akka.Persistence.Hosting.Tests pass - Verified backwards compatibility with existing EventAdapterSpecs Related Issues: - Fixes runtime invocation issue reported in akkadotnet/Akka.Persistence.Sql#552 - Critical fix for #665 callback API requirement * Address PR feedback: improve test isolation - Remove complex EventsByTag query approach (in-memory journal doesn't support this) - Use simple static counter with proper resets for test isolation - Each test explicitly resets the counter at the start - Maintains same verification: adapters are invoked at runtime - Keeps all original test scenarios from issue #552 reproduction Addresses #674 (comment) * Address PR feedback: use EventsByTag query to verify adapter invocation - Remove static CallCount field that caused concurrency issues in tests - Simplify test to use Akka.Persistence.Query with EventsByTag instead of static counters - Fix hardcoded AkkaVersion "1.5.53" to use $(AkkaVersion) variable in csproj - Add Akka.Persistence.Query.InMemory package dependency - Remove overly complex multi-journal test scenarios, focusing on single clear test case This properly verifies that event adapters configured via the callback API are invoked at runtime by tagging events and querying for them.
1 parent af0ef22 commit f75c1f4

File tree

3 files changed

+160
-2
lines changed

3 files changed

+160
-2
lines changed

src/Akka.Persistence.Hosting.Tests/Akka.Persistence.Hosting.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
</PropertyGroup>
66

77
<ItemGroup>
8+
<PackageReference Include="Akka.Persistence.Query.InMemory" Version="$(AkkaVersion)" />
89
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" />
910
<PackageReference Include="FluentAssertions" Version="6.12.2" />
1011
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Akka.Actor;
6+
using Akka.Configuration;
7+
using Akka.Hosting;
8+
using Akka.Persistence.Journal;
9+
using Akka.Persistence.Query;
10+
using Akka.Persistence.Query.InMemory;
11+
using Akka.Streams;
12+
using Akka.Streams.Dsl;
13+
using FluentAssertions;
14+
using Xunit;
15+
using Xunit.Abstractions;
16+
17+
namespace Akka.Persistence.Hosting.Tests;
18+
19+
/// <summary>
20+
/// Regression test for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/552
21+
/// Verifies that event adapters configured via the NEW callback API are actually invoked at runtime
22+
/// by checking that events are tagged and appear in EventsByTag queries.
23+
/// </summary>
24+
public class EventAdapterRuntimeInvocationSpecs : Akka.Hosting.TestKit.TestKit
25+
{
26+
private readonly ITestOutputHelper _output;
27+
28+
public EventAdapterRuntimeInvocationSpecs(ITestOutputHelper output)
29+
{
30+
_output = output;
31+
}
32+
33+
#region Test Events and Actors
34+
35+
public sealed class TestEvent
36+
{
37+
public TestEvent(string data) { Data = data; }
38+
public string Data { get; }
39+
}
40+
41+
/// <summary>
42+
/// Event adapter that tags TestEvent instances
43+
/// </summary>
44+
public sealed class TestEventTagger : IWriteEventAdapter
45+
{
46+
public TestEventTagger(ExtendedActorSystem system) { }
47+
48+
public string Manifest(object evt) => string.Empty;
49+
50+
public object ToJournal(object evt)
51+
{
52+
return evt switch
53+
{
54+
TestEvent => new Tagged(evt, new[] { "test-tag" }),
55+
_ => evt
56+
};
57+
}
58+
}
59+
60+
public sealed class TestPersistentActor : ReceivePersistentActor
61+
{
62+
private readonly List<string> _events = new();
63+
64+
public sealed class SaveEvent
65+
{
66+
public SaveEvent(string data) { Data = data; }
67+
public string Data { get; }
68+
}
69+
70+
public sealed class GetEvents
71+
{
72+
public static readonly GetEvents Instance = new();
73+
private GetEvents() { }
74+
}
75+
76+
public TestPersistentActor(string persistenceId)
77+
{
78+
PersistenceId = persistenceId;
79+
80+
Command<SaveEvent>(cmd =>
81+
{
82+
var evt = new TestEvent(cmd.Data);
83+
Persist(evt, _ =>
84+
{
85+
_events.Add(cmd.Data);
86+
Sender.Tell("OK");
87+
});
88+
});
89+
90+
Command<GetEvents>(_ =>
91+
{
92+
Sender.Tell(_events.ToArray());
93+
});
94+
95+
Recover<TestEvent>(evt =>
96+
{
97+
_events.Add(evt.Data);
98+
});
99+
}
100+
101+
public override string PersistenceId { get; }
102+
}
103+
104+
#endregion
105+
106+
protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
107+
{
108+
// Use NEW callback API to add event adapters
109+
builder.WithInMemoryJournal(
110+
journalBuilder: journal =>
111+
{
112+
journal.AddWriteEventAdapter<TestEventTagger>(
113+
"test-tagger",
114+
new[] { typeof(TestEvent) });
115+
});
116+
}
117+
118+
[Fact]
119+
public async Task EventAdapter_Should_Tag_Events_And_Appear_In_EventsByTag_Query()
120+
{
121+
// Verify adapter is in HOCON configuration
122+
var config = Sys.Settings.Config;
123+
var journalConfig = config.GetConfig("akka.persistence.journal.inmem");
124+
125+
_output.WriteLine("=== HOCON Configuration ===");
126+
_output.WriteLine(journalConfig.ToString());
127+
128+
journalConfig.HasPath("event-adapters").Should().BeTrue("event-adapters should be in HOCON");
129+
journalConfig.HasPath("event-adapter-bindings").Should().BeTrue("event-adapter-bindings should be in HOCON");
130+
131+
// Create persistent actor
132+
var actor = Sys.ActorOf(Props.Create(() => new TestPersistentActor("test-1")));
133+
134+
// Persist 3 events
135+
await actor.Ask<string>(new TestPersistentActor.SaveEvent("event-1"), TimeSpan.FromSeconds(3));
136+
await actor.Ask<string>(new TestPersistentActor.SaveEvent("event-2"), TimeSpan.FromSeconds(3));
137+
await actor.Ask<string>(new TestPersistentActor.SaveEvent("event-3"), TimeSpan.FromSeconds(3));
138+
139+
// CRITICAL: Use Persistence Query to verify events were tagged
140+
var queries = Sys.ReadJournalFor<InMemoryReadJournal>(InMemoryReadJournal.Identifier);
141+
var materializer = Sys.Materializer();
142+
143+
await AwaitAssertAsync(async () =>
144+
{
145+
var taggedEvents = await queries
146+
.CurrentEventsByTag("test-tag", Offset.NoOffset())
147+
.RunWith(Sink.Seq<EventEnvelope>(), materializer);
148+
149+
_output.WriteLine($"Found {taggedEvents.Count()} events with tag 'test-tag'");
150+
151+
taggedEvents.Count().Should().Be(3,
152+
"event adapter should have tagged 3 events - if this fails, the adapter was not invoked at runtime");
153+
154+
// Verify the events are the correct type
155+
taggedEvents.All(e => e.Event is TestEvent).Should().BeTrue();
156+
});
157+
}
158+
}

src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ internal void Build()
127127

128128
adapters.AppendLine("}");
129129

130-
var finalHocon = ConfigurationFactory.ParseString(adapters.ToString())
131-
.WithFallback(Persistence.DefaultConfig()); // add the default config as a fallback
130+
var finalHocon = ConfigurationFactory.ParseString(adapters.ToString());
132131
Builder.AddHocon(finalHocon, HoconAddMode.Prepend);
133132
}
134133

0 commit comments

Comments
 (0)