Skip to content

Commit

Permalink
automatic support for Marten's new async aggregation side effect model.
Browse files Browse the repository at this point in the history
Closes GH-938. Also 3.0-beta-2
  • Loading branch information
jeremydmiller committed Sep 9, 2024
1 parent 5717f1d commit d1c76ef
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<VersionPrefix>3.0.0</VersionPrefix>
<VersionSuffix>beta-1</VersionSuffix>
<VersionSuffix>beta-2</VersionSuffix>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Metadata;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit.Sdk;

namespace MartenTests.AsyncDaemonIntegration;

public class end_to_end_publish_messages_through_marten_to_wolverine
{
[Fact]
public async Task can_publish_messages_through_outbox()
{
await dropSchema();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "wolverine_side_effects";

m.Projections.Add<Projection3>(ProjectionLifecycle.Async);
})
.IntegrateWithWolverine()
.AddAsyncDaemon(DaemonMode.Solo);

opts.Policies.UseDurableLocalQueues();
}).StartAsync();

var streamId = Guid.NewGuid();

Func<IMessageContext, Task> publish = async _ =>
{
using var session = host.DocumentStore().LightweightSession();
session.Events.StartStream<SideEffects1>(streamId, new AEvent(), new AEvent(), new BEvent());
await session.SaveChangesAsync();
};

var tracked = await host
.TrackActivity()
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<GotB>(host)
.ExecuteAndWaitAsync(publish);

tracked.Executed.SingleMessage<GotB>()
.StreamId.ShouldBe(streamId);
}

private static async Task dropSchema()
{
using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await conn.DropSchemaAsync("wolverine_side_effects");
await conn.CloseAsync();
}
}

public class Projection3: SingleStreamProjection<SideEffects1>
{
public void Apply(SideEffects1 aggregate, AEvent _)
{
aggregate.A++;
}

public void Apply(SideEffects1 aggregate, BEvent _)
{

}

public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<SideEffects1> slice)
{
if (slice.Aggregate != null && slice.Events().OfType<IEvent<BEvent>>().Any())
{
slice.PublishMessage(new GotB(slice.Aggregate.Id));
}

return new ValueTask();
}
}

public record GotB(Guid StreamId);

public static class GotBHandler
{
public static void Handle(GotB message) => Debug.WriteLine("Got B for stream " + message.StreamId);
}

public class SideEffects1: IRevisioned
{
public Guid Id { get; set; }
public int A { get; set; }
public int B { get; set; }
public int C { get; set; }
public int D { get; set; }
public int Version { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using JasperFx.Core.IoC;
using JasperFx.Core.Reflection;
using Marten;
using Marten.Internal;
using Marten.Storage;
using Marten.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -56,6 +57,8 @@ public static MartenServiceCollectionExtensions.MartenStoreExpression<T> Integra
"The schema name must be in all lower case characters");
}

expression.Services.AddSingleton<IConfigureMarten<T>, MartenOverrides<T>>();

expression.Services.AddSingleton<IAncillaryMessageStore>(s =>
{
var store = s.GetRequiredService<T>().As<DocumentStore>();
Expand Down
8 changes: 7 additions & 1 deletion src/Persistence/Wolverine.Marten/MartenIntegration.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using JasperFx.Core.Reflection;
using Marten;
using Marten.Events;
using Marten.Internal;
using Marten.Schema;
using Microsoft.Extensions.DependencyInjection;
using Wolverine.Marten.Codegen;
using Wolverine.Marten.Persistence.Sagas;
using Wolverine.Marten.Publishing;
using Wolverine.Persistence.Sagas;
using Wolverine.Postgresql.Transport;
using Wolverine.Runtime;
Expand Down Expand Up @@ -57,10 +59,12 @@ EventForwardingTransform<T> IEventForwarding.SubscribeToEvent<T>()
}
}

internal class SagasShouldUseNumericRevisions : IConfigureMarten
internal class MartenOverrides : IConfigureMarten
{
public void Configure(IServiceProvider services, StoreOptions options)
{
options.Events.MessageOutbox = new MartenToWolverineOutbox(services);

options.Policies.ForAllDocuments(mapping =>
{
if (mapping.DocumentType.CanBeCastTo<Saga>())
Expand All @@ -72,6 +76,8 @@ public void Configure(IServiceProvider services, StoreOptions options)
}
}

internal class MartenOverrides<T> : MartenOverrides, IConfigureMarten<T> where T : IDocumentStore{}

internal class EventWrapperForwarder : IHandledTypeRule
{
public bool TryFindHandledType(Type concreteType, out Type handlerType)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Marten;
using Marten.Events.Aggregation;
using Marten.Internal.Sessions;
using Marten.Services;
using Wolverine.Runtime;

namespace Wolverine.Marten.Publishing;

internal class MartenToWolverineMessageBatch(MessageContext Context, DocumentSessionBase Session) : IMessageBatch
{
public async ValueTask PublishAsync<T>(T message)
{
await Context.PublishAsync(message);
}

public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
{
return Context.FlushOutgoingMessagesAsync();
}

public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Marten.Events.Aggregation;
using Marten.Internal.Sessions;
using Microsoft.Extensions.DependencyInjection;
using Wolverine.Runtime;

namespace Wolverine.Marten.Publishing;

internal class MartenToWolverineOutbox : IMessageOutbox
{
private readonly Lazy<IWolverineRuntime> _runtime;

public MartenToWolverineOutbox(IServiceProvider services)
{
_runtime = new Lazy<IWolverineRuntime>(() => services.GetRequiredService<IWolverineRuntime>());
}

public async ValueTask<IMessageBatch> CreateBatch(DocumentSessionBase session)
{
var context = new MessageContext(_runtime.Value, session.TenantId);
await context.EnlistInOutboxAsync(new MartenEnvelopeTransaction(session, context));

return new MartenToWolverineMessageBatch(context, session);
}
}
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ProjectReference Include="..\Wolverine.Postgresql\Wolverine.Postgresql.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Marten.CommandLine" Version="7.26.2" />
<PackageReference Include="Marten.CommandLine" Version="7.27.0" />
</ItemGroup>
<ItemGroup>
<Compile Remove="CritterStackHostBuilder.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In
expression.Services.AddType(typeof(IDatabaseSource), typeof(MartenMessageDatabaseDiscovery),
ServiceLifetime.Singleton);

expression.Services.AddSingleton<IConfigureMarten, SagasShouldUseNumericRevisions>();
expression.Services.AddSingleton<IConfigureMarten, MartenOverrides>();

expression.Services.AddSingleton<IWolverineExtension>(new MartenIntegration
{
Expand Down

0 comments on commit d1c76ef

Please sign in to comment.