From d1c76efe33c81972685da16005b011ff8f586a82 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 8 Sep 2024 21:30:14 -0500 Subject: [PATCH] automatic support for Marten's new async aggregation side effect model. Closes GH-938. Also 3.0-beta-2 --- Directory.Build.props | 2 +- ...sh_messages_through_marten_to_wolverine.cs | 110 ++++++++++++++++++ ...cillaryWolverineOptionsMartenExtensions.cs | 3 + .../Wolverine.Marten/MartenIntegration.cs | 8 +- .../MartenToWolverineMessageBatch.cs | 25 ++++ .../Publishing/MartenToWolverineOutbox.cs | 24 ++++ .../Wolverine.Marten/Wolverine.Marten.csproj | 2 +- .../WolverineOptionsMartenExtensions.cs | 2 +- 8 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 src/Persistence/MartenTests/AsyncDaemonIntegration/end_to_end_publish_messages_through_marten_to_wolverine.cs create mode 100644 src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs create mode 100644 src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs diff --git a/Directory.Build.props b/Directory.Build.props index d27f27caa..b7efc0b41 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -12,7 +12,7 @@ true enable 3.0.0 - beta-1 + beta-2 $(PackageProjectUrl) true true diff --git a/src/Persistence/MartenTests/AsyncDaemonIntegration/end_to_end_publish_messages_through_marten_to_wolverine.cs b/src/Persistence/MartenTests/AsyncDaemonIntegration/end_to_end_publish_messages_through_marten_to_wolverine.cs new file mode 100644 index 000000000..33af26fc2 --- /dev/null +++ b/src/Persistence/MartenTests/AsyncDaemonIntegration/end_to_end_publish_messages_through_marten_to_wolverine.cs @@ -0,0 +1,110 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Daemon.Resiliency; +using Marten.Events.Projections; +using Marten.Metadata; +using Microsoft.Extensions.Hosting; +using Npgsql; +using Shouldly; +using Weasel.Postgresql; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; +using Xunit.Sdk; + +namespace MartenTests.AsyncDaemonIntegration; + +public class end_to_end_publish_messages_through_marten_to_wolverine +{ + [Fact] + public async Task can_publish_messages_through_outbox() + { + await dropSchema(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "wolverine_side_effects"; + + m.Projections.Add(ProjectionLifecycle.Async); + }) + .IntegrateWithWolverine() + .AddAsyncDaemon(DaemonMode.Solo); + + opts.Policies.UseDurableLocalQueues(); + }).StartAsync(); + + var streamId = Guid.NewGuid(); + + Func publish = async _ => + { + using var session = host.DocumentStore().LightweightSession(); + session.Events.StartStream(streamId, new AEvent(), new AEvent(), new BEvent()); + await session.SaveChangesAsync(); + }; + + var tracked = await host + .TrackActivity() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(publish); + + tracked.Executed.SingleMessage() + .StreamId.ShouldBe(streamId); + } + + private static async Task dropSchema() + { + using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("wolverine_side_effects"); + await conn.CloseAsync(); + } +} + +public class Projection3: SingleStreamProjection +{ + public void Apply(SideEffects1 aggregate, AEvent _) + { + aggregate.A++; + } + + public void Apply(SideEffects1 aggregate, BEvent _) + { + + } + + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Aggregate != null && slice.Events().OfType>().Any()) + { + slice.PublishMessage(new GotB(slice.Aggregate.Id)); + } + + return new ValueTask(); + } +} + +public record GotB(Guid StreamId); + +public static class GotBHandler +{ + public static void Handle(GotB message) => Debug.WriteLine("Got B for stream " + message.StreamId); +} + +public class SideEffects1: IRevisioned +{ + public Guid Id { get; set; } + public int A { get; set; } + public int B { get; set; } + public int C { get; set; } + public int D { get; set; } + public int Version { get; set; } +} diff --git a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs index 3b6f08139..f1645adb6 100644 --- a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs @@ -2,6 +2,7 @@ using JasperFx.Core.IoC; using JasperFx.Core.Reflection; using Marten; +using Marten.Internal; using Marten.Storage; using Marten.Subscriptions; using Microsoft.Extensions.DependencyInjection; @@ -56,6 +57,8 @@ public static MartenServiceCollectionExtensions.MartenStoreExpression Integra "The schema name must be in all lower case characters"); } + expression.Services.AddSingleton, MartenOverrides>(); + expression.Services.AddSingleton(s => { var store = s.GetRequiredService().As(); diff --git a/src/Persistence/Wolverine.Marten/MartenIntegration.cs b/src/Persistence/Wolverine.Marten/MartenIntegration.cs index 3cec6da05..726871f22 100644 --- a/src/Persistence/Wolverine.Marten/MartenIntegration.cs +++ b/src/Persistence/Wolverine.Marten/MartenIntegration.cs @@ -1,10 +1,12 @@ using JasperFx.Core.Reflection; using Marten; using Marten.Events; +using Marten.Internal; using Marten.Schema; using Microsoft.Extensions.DependencyInjection; using Wolverine.Marten.Codegen; using Wolverine.Marten.Persistence.Sagas; +using Wolverine.Marten.Publishing; using Wolverine.Persistence.Sagas; using Wolverine.Postgresql.Transport; using Wolverine.Runtime; @@ -57,10 +59,12 @@ EventForwardingTransform IEventForwarding.SubscribeToEvent() } } -internal class SagasShouldUseNumericRevisions : IConfigureMarten +internal class MartenOverrides : IConfigureMarten { public void Configure(IServiceProvider services, StoreOptions options) { + options.Events.MessageOutbox = new MartenToWolverineOutbox(services); + options.Policies.ForAllDocuments(mapping => { if (mapping.DocumentType.CanBeCastTo()) @@ -72,6 +76,8 @@ public void Configure(IServiceProvider services, StoreOptions options) } } +internal class MartenOverrides : MartenOverrides, IConfigureMarten where T : IDocumentStore{} + internal class EventWrapperForwarder : IHandledTypeRule { public bool TryFindHandledType(Type concreteType, out Type handlerType) diff --git a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs new file mode 100644 index 000000000..0bf7d9144 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs @@ -0,0 +1,25 @@ +using Marten; +using Marten.Events.Aggregation; +using Marten.Internal.Sessions; +using Marten.Services; +using Wolverine.Runtime; + +namespace Wolverine.Marten.Publishing; + +internal class MartenToWolverineMessageBatch(MessageContext Context, DocumentSessionBase Session) : IMessageBatch +{ + public async ValueTask PublishAsync(T message) + { + await Context.PublishAsync(message); + } + + public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) + { + return Context.FlushOutgoingMessagesAsync(); + } + + public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) + { + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs new file mode 100644 index 000000000..7b42f6d0f --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs @@ -0,0 +1,24 @@ +using Marten.Events.Aggregation; +using Marten.Internal.Sessions; +using Microsoft.Extensions.DependencyInjection; +using Wolverine.Runtime; + +namespace Wolverine.Marten.Publishing; + +internal class MartenToWolverineOutbox : IMessageOutbox +{ + private readonly Lazy _runtime; + + public MartenToWolverineOutbox(IServiceProvider services) + { + _runtime = new Lazy(() => services.GetRequiredService()); + } + + public async ValueTask CreateBatch(DocumentSessionBase session) + { + var context = new MessageContext(_runtime.Value, session.TenantId); + await context.EnlistInOutboxAsync(new MartenEnvelopeTransaction(session, context)); + + return new MartenToWolverineMessageBatch(context, session); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index f89643c9e..d0e846bb0 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs index 24e481cd6..9accb13ea 100644 --- a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs @@ -94,7 +94,7 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In expression.Services.AddType(typeof(IDatabaseSource), typeof(MartenMessageDatabaseDiscovery), ServiceLifetime.Singleton); - expression.Services.AddSingleton(); + expression.Services.AddSingleton(); expression.Services.AddSingleton(new MartenIntegration {