Skip to content

Commit

Permalink
docs on the new side effect model for aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Sep 12, 2024
1 parent 1fd1e23 commit 83ca914
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
38 changes: 38 additions & 0 deletions docs/events/projections/aggregate-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,41 @@ public class using_apply_metadata : OneOffConfigurationsContext
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/using_apply_metadata.cs#L12-L44' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_apply_metadata' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Raising Events, Messages, or other Operations in Aggregation Projections <Badge type="tip" text="7.27" />

Man, that's a mouthful of a title. _Sometimes_, it can be valuable to emit new events during the processing of a projection
when you first know the new state of the projected aggregate documents. Or maybe what you might want to do is to send
a message for the new state of an updated projection. Here's a couple possible scenarios that might lead you here:

* There's some kind of business logic that can be processed against an aggregate to "decide" what the system
can do next
* You need to send updates about the aggregated projection state to clients via web sockets
* You need to replicate the Marten projection data in a completely different database
* There are business processes that can be kicked off for updates to the aggregated state

To do any of this, you can override the `RaiseSideEffects()` method in any aggregated projection that uses one of the
following base classes:

1. `SingleStreamProjection`
1. `MultiStreamProjection`
1. `CustomStreamProjection`

Here's an example of that method overridden in a projection:

snippet: sample_aggregation_using_event_metadata

A couple important facts about this new functionality:

* The `RaiseSideEffects()` method is only called during _continuus_ asynchronous projection execution, and will not
be called during projection rebuilds or `Inline` projection usage
* Events emitted during the side effect method are _not_ immediately applied to the current projected document value by Marten
* You *can* alter the aggregate value or replace it yourself in this side effect method to reflect new events, but the onus
is on you the user to apply idempotent updates to the aggregate based on these new events in the actual handlers for
the new events when those events are handled by the daemon in a later batch
* There is a [Wolverine](https://wolverinefx.net) integration (of course) to publish the messages through Wolverine if using the `AddMarten()IntegrateWithWolverine()` option

This relatively new behavior that was built for a specific [JasperFx Software](https://jasperfx.net) client project,
but has been on the backlog for quite some time. If there are any difficulties with this approach, please feel free
to join the [Marten Discord room](https://discord.gg/BGkCDx5d).

55 changes: 55 additions & 0 deletions src/EventSourcingTests/Examples/TripProjectionWithEventMetadata.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Threading.Tasks;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;

namespace EventSourcingTests.Examples;

Expand All @@ -12,6 +15,10 @@ public class Trip
public string CustomerId { get; set; }
public DateTimeOffset Ended { get; set; }
public string Description { get; set; }

public double TotalMiles { get; set; }

public Guid InsuranceCompanyId { get; set; }
}

public class TripStarted
Expand Down Expand Up @@ -50,6 +57,54 @@ public void Apply(TripEnded ended, Trip trip, IEvent @event)
}

// Other Apply/ShouldDelete methods

public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<Trip> slice)
{
// Emit other events or messages during asynchronous projection
// processing

// Access to the current state as of the projection
// event page being processed *right* now
var currentTrip = slice.Aggregate;

if (currentTrip.TotalMiles > 1000)
{
// Append a new event to this stream
slice.AppendEvent(new PassedThousandMiles());

// Append a new event to a different event stream by
// first specifying a different stream id
slice.AppendEvent(currentTrip.InsuranceCompanyId, new IncrementThousandMileTrips());

// "Publish" outgoing messages when the event page is successfully committed
slice.PublishMessage(new SendCongratulationsOnLongTrip(currentTrip.Id));

// And yep, you can make additional changes to Marten
operations.Store(new CompletelyDifferentDocument
{
Name = "New Trip Segment",
OriginalTripId = currentTrip.Id
});
}

// This usage has to be async in case you're
// doing any additional data access with the
// Marten operations
return new ValueTask();
}
}

#endregion

public record PassedThousandMiles;

public record IncrementThousandMileTrips;

public record SendCongratulationsOnLongTrip(Guid TripId);

public class CompletelyDifferentDocument
{
public Guid Id { get; set; }
public string Name { get; set; }
public Guid OriginalTripId { get; set; }
}
9 changes: 9 additions & 0 deletions src/Marten/Events/Aggregation/IAggregateProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@

namespace Marten.Events.Aggregation;

#region IAggregateProjectionWithSideEffects

/// <summary>
/// Marks a grouped or aggregated projection as emitting "side effects"
/// during asynchronous projection processing
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAggregateProjectionWithSideEffects<T>
{
/// <summary>
Expand All @@ -22,6 +29,8 @@ public interface IAggregateProjectionWithSideEffects<T>
bool IsSingleStream();
}

#endregion

/// <summary>
/// Internal service within aggregating projections
/// </summary>
Expand Down

0 comments on commit 83ca914

Please sign in to comment.