diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 95bfa1ff0e..7adc481117 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -172,7 +172,8 @@ const config: UserConfig = { text: 'Aggregate Projections', link: '/events/projections/aggregate-projections', items: [ { text: 'Live Aggregations', link: '/events/projections/live-aggregates' }, { text: 'Multi-Stream Projections', link: '/events/projections/multi-stream-projections' }, - { text: 'Explicit Aggregations', link: '/events/projections/custom-aggregates' },] + { text: 'Explicit Aggregations', link: '/events/projections/custom-aggregates' }, + { text: 'Reading Aggregates', link: '/events/projections/read-aggregates'}] }, { text: 'Event Projections', link: '/events/projections/event-projections' }, { text: 'Custom Projections', link: '/events/projections/custom' }, diff --git a/docs/diagnostics.md b/docs/diagnostics.md index cc5d3d3624..ca940419c2 100644 --- a/docs/diagnostics.md +++ b/docs/diagnostics.md @@ -24,7 +24,7 @@ builder.ConfigureServices(services => opts.DisableNpgsqlLogging = true; opts.Events.AppendMode = EventAppendMode.Quick; - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Add(ProjectionLifecycle.Inline); }); diff --git a/docs/documents/identity.md b/docs/documents/identity.md index d39eb4f2c0..00a51bdb07 100644 --- a/docs/documents/identity.md +++ b/docs/documents/identity.md @@ -552,7 +552,7 @@ public class LimitedDoc public LowerLimit Lower { get; set; } } ``` -snippet source | anchor +snippet source | anchor ```cs [ValueObject] @@ -590,7 +590,7 @@ And the `UpperLimit` and `LowerLimit` value types can be registered with Marten opts.RegisterValueType(typeof(UpperLimit)); opts.RegisterValueType(typeof(LowerLimit)); ``` -snippet source | anchor +snippet source | anchor ```cs // opts is a StoreOptions just like you'd have in @@ -628,7 +628,7 @@ public async Task store_several_and_order_by() ordered.ShouldHaveTheSameElementsAs(doc1.Id, doc4.Id, doc3.Id, doc2.Id); } ``` -snippet source | anchor +snippet source | anchor ```cs [Fact] diff --git a/docs/events/appending.md b/docs/events/appending.md index 56afd8c05b..66dbddbb5a 100644 --- a/docs/events/appending.md +++ b/docs/events/appending.md @@ -138,7 +138,7 @@ session.Events.Append(id, joined, departed); await session.SaveChangesAsync(); ``` -snippet source | anchor +snippet source | anchor ## Mandatory Stream Types @@ -171,7 +171,7 @@ builder.Services.AddMarten(opts => opts.Events.UseMandatoryStreamTypeDeclaration = true; }); ``` -snippet source | anchor +snippet source | anchor This causes a couple side effects that **force stricter usage of Marten**: diff --git a/docs/events/index.md b/docs/events/index.md index 55266d7ca0..f98e6a8620 100644 --- a/docs/events/index.md +++ b/docs/events/index.md @@ -47,7 +47,7 @@ var store2 = DocumentStore.For(_ => _.Events.AddEventType(typeof(MonsterSlayed)); }); ``` -snippet source | anchor +snippet source | anchor ## Stream or Aggregate Types diff --git a/docs/events/optimizing.md b/docs/events/optimizing.md index aa292d2534..620c5a7c48 100644 --- a/docs/events/optimizing.md +++ b/docs/events/optimizing.md @@ -29,10 +29,9 @@ builder.Services.AddMarten(opts => opts.Events.AppendMode = EventAppendMode.Quick; // Little more involved, but this can reduce the number - // of database queries necessary to process inline projections - // during command handling with some significant - // caveats - opts.Events.UseIdentityMapForInlineAggregates = true; + // of database queries necessary to process projections + // during CQRS command handling with certain workflows + opts.Events.UseIdentityMapForAggregates = true; // Opts into a mode where Marten is able to rebuild single // stream projections faster by building one stream at a time @@ -40,7 +39,7 @@ builder.Services.AddMarten(opts => opts.Events.UseOptimizedProjectionRebuilds = true; }); ``` -snippet source | anchor +snippet source | anchor The archived stream option is further described in the section on [Hot/Cold Storage Partitioning](/events/archiving.html#hot-cold-storage-partitioning). diff --git a/docs/events/projections/aggregate-projections.md b/docs/events/projections/aggregate-projections.md index 80b36f0ee7..bd6011f903 100644 --- a/docs/events/projections/aggregate-projections.md +++ b/docs/events/projections/aggregate-projections.md @@ -1,5 +1,13 @@ # Aggregate Projections +::: tip +Definitely check out the content on [CQRS Command Handler Workflow for Capturing Events](/scenarios/command_handler_workflow) +and [Reading Aggregates](/events/projections/read-aggregates) to get the best possible performance and +development usability for aggregate projections with Marten. Also see the combination with [Wolverine](https://wolverinefx.net) +in its [Aggregate Handler Workflow](https://wolverinefx.net/guide/durability/marten/event-sourcing.html) for literally the +lowest code ceremony possible to use Marten within a CQRS architecture. +::: + _Aggregate Projections_ in Marten combine some sort of grouping of events and process them to create a single aggregated document representing the state of those events. To jump into a simple example, here's a simple aggregated view called `QuestParty` that creates an aggregated view of `MembersJoined`, `MembersDeparted`, and `QuestStarted` events related to a group of heroes traveling on a quest in your favorite fantasy novel: diff --git a/docs/events/projections/custom-aggregates.md b/docs/events/projections/custom-aggregates.md index 0265a8ee9f..ba25d3f67b 100644 --- a/docs/events/projections/custom-aggregates.md +++ b/docs/events/projections/custom-aggregates.md @@ -27,7 +27,7 @@ public class Increment { } ``` -snippet source | anchor +snippet source | anchor And a simple aggregate document type like this: @@ -51,7 +51,7 @@ public class StartAndStopAggregate: ISoftDeleted } } ``` -snippet source | anchor +snippet source | anchor As you can see, `StartAndStopAggregate` as a `Guid` as its identity and is also [soft-deleted](/documents/deletes.html#soft-deletes) when stored by @@ -126,7 +126,7 @@ public class StartAndStopProjection: CustomProjectionsnippet source | anchor +snippet source | anchor ## Custom Grouping @@ -168,7 +168,7 @@ public class ExplicitCounter: CustomProjection } } ``` -snippet source | anchor +snippet source | anchor Note that this usage is valid for all possible projection lifecycles now (`Live`, `Inline`, and `Async`). diff --git a/docs/events/projections/read-aggregates.md b/docs/events/projections/read-aggregates.md new file mode 100644 index 0000000000..339d71126c --- /dev/null +++ b/docs/events/projections/read-aggregates.md @@ -0,0 +1,306 @@ +# Reading Aggregates + +One of the primary use cases for projections with Marten in day to day development is going to be needing +to read current state of a single event stream as an aggregate (what Marten calls a "single stream projection"). + +Let's say we have an aggregate for an `Invoice` in our system that we use to create a "write" or "read" model of +a single invoice event stream in our system like so: + + + +```cs +public record InvoiceCreated(string Description, decimal Amount); + +public record InvoiceApproved; +public record InvoiceCancelled; +public record InvoicePaid; +public record InvoiceRejected; + +public class Invoice +{ + public Invoice() + { + } + + public static Invoice Create(IEvent created) + { + return new Invoice + { + Amount = created.Data.Amount, + Description = created.Data.Description, + + // Capture the timestamp from the event + // metadata captured by Marten + Created = created.Timestamp, + Status = InvoiceStatus.Created + }; + } + + public int Version { get; set; } + + public decimal Amount { get; set; } + public string Description { get; set; } + public Guid Id { get; set; } + public DateTimeOffset Created { get; set; } + public InvoiceStatus Status { get; set; } + + public void Apply(InvoiceCancelled _) => Status = InvoiceStatus.Cancelled; + public void Apply(InvoiceRejected _) => Status = InvoiceStatus.Rejected; + public void Apply(InvoicePaid _) => Status = InvoiceStatus.Paid; + public void Apply(InvoiceApproved _) => Status = InvoiceStatus.Approved; +} +``` +snippet source | anchor + + +If we were to register that `Invoice` aggregate as a `Live` snapshot like so: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + // Just telling Marten upfront that we will use + // live aggregation for the Invoice aggregate + // This would be the default anyway if you didn't explicitly + // register Invoice this way, but doing so let's + // Marten "know" about Invoice for code generation + opts.Projections.LiveStreamAggregation(); +}); +``` +snippet source | anchor + + +Then we could use the `AggregateStreamAsync` API to read the current `Invoice` state for any +single event stream like so: + + + +```cs +public static async Task read_live_invoice( + IQuerySession session, + Guid invoiceId) +{ + var invoice = await session + .Events.AggregateStreamAsync(invoiceId); +} +``` +snippet source | anchor + + +::: info +`AggregateStreamAsync()` will work regardless of the registered projection lifecycle, and is also your +primary mechanism for "time travel" querying of projection state. +::: + +If instead, we wanted strong consistency and would prefer to update our `Invoice` aggregates as an +`Inline` snapshot: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); +}); +``` +snippet source | anchor + + +Then we can just treat the `Invoice` as any old Marten document (because it is) and use +the standard `LoadAsync()` API to load the current state of an `Invoice` for an event stream like: + + + +```cs +public static async Task read_inline_invoice( + IQuerySession session, + Guid invoiceId) +{ + var invoice = await session + .LoadAsync(invoiceId); +} +``` +snippet source | anchor + + +And lastly, if we wanted to run the `Invoice` snapshot updates as an asynchronous projection (maybe to take advantage +of Marten's ability to do blue/green deployments?): + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }) + .AddAsyncDaemon(DaemonMode.HotCold); +``` +snippet source | anchor + + +We would still just the same `LoadAsync()` API, but you just hope that +the async daemon has caught up to where ever our particular `Invoice` was last updated. + +::: tip +Ah, the joys of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency). +::: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); +}); +``` +snippet source | anchor + + +## FetchLatest + +::: tip +`FetchLatest` is a little more lightweight in execution than `FetchForWriting` and +should be used if all you care about is read only data without appending new events. +::: + +::: warning +For internal reasons, the `FetchLatest()` API is only available off of `IDocumentSession` and not `IQuerySession`. +::: + +But wait, there's a way to both get a guarantee of getting the exact correct information about an `Invoice` +for the current event data that works no matter what projection lifecycle we're running the +`Invoice` aggregate? Marten now has the singular `FetchLatest()` API to do exactly that: + + + +```cs +public static async Task read_latest( + // Watch this, only available on the full IDocumentSession + IDocumentSession session, + Guid invoiceId) +{ + var invoice = await session + .Events.FetchLatest(invoiceId); +} +``` +snippet source | anchor + + +Just to understand how this API works, under the covers, if `Invoice` is registered as: + +1. `Live`, then `FetchLatest()` is basically doing the same thing as `AggregateStreamAsync()` +2. `Inline`, then `FetchLatest()` is essentially using `LoadAsync()` +3. `Async`, then `FetchLatest()` does a little bit more. It queries both the for the current snapshot of the `Invoice`, then any + events for that `Invoice` that haven't yet been applied, and advances the `Invoice` in memory so that you get the exact + current state of the `Invoice` even if the async daemon process is behind the latest changes + +Moreover, `FetchLatest` was meant to be used in conjunction with `FetchForWriting()` to get you the most +current version of an aggregate that was just updated using `FetchForWriting()` from the same session. To really +get the most of this combination, use this opt in flag: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.Services.AddMarten(opts => +{ + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + // This opts Marten into a pretty big optimization + // for how FetchForWriting and/or FetchLatest work internally + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); +}); +``` +snippet source | anchor + + +::: warning +That flag is `false` by default because it was introduced halfway through the 7.* version lifecycle, +and can introduce subtle bugs in application code if you use some kind of `AggregateRoot` pattern where +your application code mutates the aggregate projection objects outside of Marten control. + +Also, the Marten team recommends an approach where only Marten itself ever changes the state of a projected document +and you keep application logic separate from the projected data classes. More or less, we're recommending more of a +functional programming approach. +::: + +Now, let's say that in our commands we want to both mutate an `Invoice` event stream by appending new events *and* +return the newly updated state of the `Invoice` to the original caller in the most efficient way possible. Just for +fun, let's say we wrote a helper function like this: + + + +```cs +public static class MutationExtensions +{ + public static async Task Mutate(this IDocumentSession session, Guid id, Func decider, + CancellationToken token = default) where T : class + { + var stream = await session.Events.FetchForWriting(id, token); + + // Decide what new events should be appended based on the current + // state of the aggregate and application logic + var events = decider(stream.Aggregate); + stream.AppendMany(events); + + // Persist any new events + await session.SaveChangesAsync(token); + + return await session.Events.FetchLatest(id, token); + } +} +``` +snippet source | anchor + + +And used it for a command handler something like this: + + + +```cs +public static Task Approve(IDocumentSession session, Guid invoiceId) +{ + return session.Mutate(invoiceId, invoice => + { + if (invoice.Status != InvoiceStatus.Approved) + { + return new object[] { new InvoiceApproved() }; + } + + return Array.Empty(); + }); +} +``` +snippet source | anchor + + +Okay, so for some context, if using the full fledged `UseIdentityMapForAggregates` + `FetchForWriting`, then `FetchLatest` +workflow, Marten is optimizing the `FetchLatest` if the lifecycle is: + +1. `Live`, then Marten starts with the version of the aggregate `Invoice` created by the initial `FetchForWriting()` call + and applies any new events appended in that operation to the `Invoice` to create the "latest" version for you without + incurring any additional database round trips +2. `Inline`, then Marten will add the initially loaded `Invoice` from `FetchForWriting` into the identity map + for the session *regardless* of what type of session this is, and `FetchLatest` will use the value of the + projected `Invoice` updated as part of `SaveChangesAsync()` to prevent any additional database round trips +3. `Async`, then Marten will use the initial version of the `Invoice` aggregate loaded by `FetchForWriting()` and + applies with any additional events appended to that session to give you the exact version of the `Invoice` after + the new events are applied + +In all cases, the `FetchForWriting` + `FetchLatest` combination is working together to get you +the correct information in the most efficient way possible by eliminating extra trips to the +database. diff --git a/docs/events/projections/rebuilding.md b/docs/events/projections/rebuilding.md index 1d4ecce9b2..fc8fe98ed3 100644 --- a/docs/events/projections/rebuilding.md +++ b/docs/events/projections/rebuilding.md @@ -61,7 +61,7 @@ builder.Services.AddMarten(opts => opts.Events.UseOptimizedProjectionRebuilds = true; // [!code ++] }); ``` -snippet source | anchor +snippet source | anchor In this mode, Marten will rebuild single stream projection documents stream by stream in the reverse order that the @@ -80,5 +80,5 @@ on `IDocumentStore`: ```cs await theStore.Advanced.RebuildSingleStreamAsync(streamId); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/events/querying.md b/docs/events/querying.md index 204a1240ea..20f05efe04 100644 --- a/docs/events/querying.md +++ b/docs/events/querying.md @@ -287,7 +287,7 @@ public async Task can_query_against_event_type() .Single(x => x.Members.Contains("Matt")).Id.ShouldBe(departed2.Id); } ``` -snippet source | anchor +snippet source | anchor You can use any Linq operator that Marten supports to query against event data. We think that this functionality is probably more useful for diagnostics or troubleshooting rather than something you would routinely use to support your application. We recommend that you favor event projection views over querying within the raw event table. @@ -305,7 +305,7 @@ public void example_of_querying_for_event_data(IDocumentSession session, Guid st .ToList(); } ``` -snippet source | anchor +snippet source | anchor This mechanism will allow you to query by any property of the `IEvent` interface shown above. @@ -322,5 +322,5 @@ var raw = await theSession.Events.QueryAllRawEvents() .Where(x => x.EventTypesAre(typeof(CEvent), typeof(DEvent))) .ToListAsync(); ``` -snippet source | anchor +snippet source | anchor diff --git a/docs/events/subscriptions.md b/docs/events/subscriptions.md index 8afbe03d6e..2479725b79 100644 --- a/docs/events/subscriptions.md +++ b/docs/events/subscriptions.md @@ -583,13 +583,13 @@ public class ErrorHandlingSubscription: SubscriptionBase { public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) { - Console.WriteLine("Marten just made a commit for any changes"); + Console.WriteLine("Marten is about to make a commit for any changes"); return Task.CompletedTask; } public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) { - Console.WriteLine("Marten is about to make a commit for any changes"); + Console.WriteLine("Marten just made a commit for any changes"); return Task.CompletedTask; } } diff --git a/docs/scenarios/command_handler_workflow.md b/docs/scenarios/command_handler_workflow.md index f87868d2d0..3f5db3896d 100644 --- a/docs/scenarios/command_handler_workflow.md +++ b/docs/scenarios/command_handler_workflow.md @@ -12,13 +12,21 @@ will probably need to evaluate the incoming command against the current state of the incoming command altogether if the system is not in the proper state for the command. And by the way, you probably also need to be concerned with concurrent access to the business data represented by a single event stream. -## FetchForWriting +## FetchForWriting ::: tip -As of Marten 7, this API is usable with aggregation projections that are running with an asynchronous lifecycle. This +This API is usable with aggregation projections that are running with an asynchronous lifecycle. This is key to create "zero downtime deployments" for projection changes. ::: +::: tip +The more recent [FetchLatest](/events/projections/read-aggregates) API is a lighter weight, read +only version of `FetchForWriting` that may be slightly more performant if all you care about +is getting the latest data. Do note that there are significant optimizations for using +`FetchForWriting`, then appending new events, saving the session, and using `FetchLatest` to get the current +state of the aggregate being updated. +::: + ::: warning `FetchForWriting()` is only possible with single stream aggregation projections, which includes the "self-aggregating" snapshot feature. This API assumes that it's working with one stream, and directly accesses the stream table. Multi-stream @@ -156,13 +164,13 @@ builder.Services.AddMarten(opts => // an Inline projection for the "T". Saves on Marten doing an extra // database fetch of the same data you already fetched from FetchForWriting() // when Marten needs to apply the Inline projection as part of SaveChanges() - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; }) // This is non-trivial performance optimization if you never // need identity map mechanics in your commands or query handlers .UseLightweightSessions(); ``` -snippet source | anchor +snippet source | anchor It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization diff --git a/src/CoreTests/AssemblyInfo.cs b/src/CoreTests/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/CoreTests/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/DocumentDbTests/AssemblyInfo.cs b/src/DocumentDbTests/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/DocumentDbTests/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/DocumentDbTests/Internal/Generated/DocumentStorage/RevisionedDocProvider1212098993.cs b/src/DocumentDbTests/Internal/Generated/DocumentStorage/RevisionedDocProvider1212098993.cs new file mode 100644 index 0000000000..ef2df2060e --- /dev/null +++ b/src/DocumentDbTests/Internal/Generated/DocumentStorage/RevisionedDocProvider1212098993.cs @@ -0,0 +1,1115 @@ +// +#pragma warning disable +using DocumentDbTests.Concurrency; +using Marten.Internal; +using Marten.Internal.Storage; +using Marten.Schema; +using Marten.Schema.Arguments; +using Npgsql; +using System; +using System.Collections.Generic; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Generated.DocumentStorage +{ + // START: UpsertRevisionedDocOperation1212098993 + public class UpsertRevisionedDocOperation1212098993 : Marten.Internal.Operations.StorageOperation + { + private readonly DocumentDbTests.Concurrency.RevisionedDoc _document; + private readonly System.Guid _id; + private readonly System.Collections.Generic.Dictionary _versions; + private readonly Marten.Schema.DocumentMapping _mapping; + + public UpsertRevisionedDocOperation1212098993(DocumentDbTests.Concurrency.RevisionedDoc document, System.Guid id, System.Collections.Generic.Dictionary versions, Marten.Schema.DocumentMapping mapping) : base(document, id, versions, mapping) + { + _document = document; + _id = id; + _versions = versions; + _mapping = mapping; + } + + + + public override void Postprocess(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions) + { + if (postprocessRevision(reader, exceptions)) + { + _document.Version = Revision; + } + + } + + + public override async System.Threading.Tasks.Task PostprocessAsync(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions, System.Threading.CancellationToken token) + { + if (await postprocessRevisionAsync(reader, exceptions, token)) + { + _document.Version = Revision; + } + + } + + + public override Marten.Internal.Operations.OperationRole Role() + { + return Marten.Internal.Operations.OperationRole.Upsert; + } + + + public override NpgsqlTypes.NpgsqlDbType DbType() + { + return NpgsqlTypes.NpgsqlDbType.Uuid; + } + + + public override void ConfigureParameters(Weasel.Postgresql.IGroupedParameterBuilder parameterBuilder, Weasel.Postgresql.ICommandBuilder builder, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session) + { + if (document.Version > 0 && Revision == 1) Revision = document.Version; + builder.Append("select numeric_revisioning.mt_upsert_revisioneddoc("); + var parameter0 = parameterBuilder.AppendParameter(session.Serializer.ToJson(_document)); + parameter0.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Jsonb; + // .Net Class Type + var parameter1 = parameterBuilder.AppendParameter(_document.GetType().FullName); + parameter1.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Varchar; + var parameter2 = parameterBuilder.AppendParameter(document.Id); + setCurrentRevisionParameter(parameterBuilder); + builder.Append(')'); + } + + } + + // END: UpsertRevisionedDocOperation1212098993 + + + // START: InsertRevisionedDocOperation1212098993 + public class InsertRevisionedDocOperation1212098993 : Marten.Internal.Operations.StorageOperation + { + private readonly DocumentDbTests.Concurrency.RevisionedDoc _document; + private readonly System.Guid _id; + private readonly System.Collections.Generic.Dictionary _versions; + private readonly Marten.Schema.DocumentMapping _mapping; + + public InsertRevisionedDocOperation1212098993(DocumentDbTests.Concurrency.RevisionedDoc document, System.Guid id, System.Collections.Generic.Dictionary versions, Marten.Schema.DocumentMapping mapping) : base(document, id, versions, mapping) + { + _document = document; + _id = id; + _versions = versions; + _mapping = mapping; + } + + + + public override void Postprocess(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions) + { + _document.Version = Revision; + } + + + public override System.Threading.Tasks.Task PostprocessAsync(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions, System.Threading.CancellationToken token) + { + _document.Version = Revision; + return System.Threading.Tasks.Task.CompletedTask; + return System.Threading.Tasks.Task.CompletedTask; + } + + + public override Marten.Internal.Operations.OperationRole Role() + { + return Marten.Internal.Operations.OperationRole.Insert; + } + + + public override NpgsqlTypes.NpgsqlDbType DbType() + { + return NpgsqlTypes.NpgsqlDbType.Uuid; + } + + + public override void ConfigureParameters(Weasel.Postgresql.IGroupedParameterBuilder parameterBuilder, Weasel.Postgresql.ICommandBuilder builder, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session) + { + if (document.Version > 0 && Revision == 1) Revision = document.Version; + builder.Append("select numeric_revisioning.mt_insert_revisioneddoc("); + var parameter0 = parameterBuilder.AppendParameter(session.Serializer.ToJson(_document)); + parameter0.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Jsonb; + // .Net Class Type + var parameter1 = parameterBuilder.AppendParameter(_document.GetType().FullName); + parameter1.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Varchar; + var parameter2 = parameterBuilder.AppendParameter(document.Id); + setCurrentRevisionParameter(parameterBuilder); + builder.Append(')'); + } + + } + + // END: InsertRevisionedDocOperation1212098993 + + + // START: UpdateRevisionedDocOperation1212098993 + public class UpdateRevisionedDocOperation1212098993 : Marten.Internal.Operations.StorageOperation + { + private readonly DocumentDbTests.Concurrency.RevisionedDoc _document; + private readonly System.Guid _id; + private readonly System.Collections.Generic.Dictionary _versions; + private readonly Marten.Schema.DocumentMapping _mapping; + + public UpdateRevisionedDocOperation1212098993(DocumentDbTests.Concurrency.RevisionedDoc document, System.Guid id, System.Collections.Generic.Dictionary versions, Marten.Schema.DocumentMapping mapping) : base(document, id, versions, mapping) + { + _document = document; + _id = id; + _versions = versions; + _mapping = mapping; + } + + + + public override void Postprocess(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions) + { + if (postprocessRevision(reader, exceptions)) + { + _document.Version = Revision; + } + + } + + + public override async System.Threading.Tasks.Task PostprocessAsync(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions, System.Threading.CancellationToken token) + { + if (await postprocessRevisionAsync(reader, exceptions, token)) + { + _document.Version = Revision; + } + + } + + + public override Marten.Internal.Operations.OperationRole Role() + { + return Marten.Internal.Operations.OperationRole.Update; + } + + + public override NpgsqlTypes.NpgsqlDbType DbType() + { + return NpgsqlTypes.NpgsqlDbType.Uuid; + } + + + public override void ConfigureParameters(Weasel.Postgresql.IGroupedParameterBuilder parameterBuilder, Weasel.Postgresql.ICommandBuilder builder, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session) + { + if (document.Version > 0 && Revision == 1) Revision = document.Version; + builder.Append("select numeric_revisioning.mt_update_revisioneddoc("); + var parameter0 = parameterBuilder.AppendParameter(session.Serializer.ToJson(_document)); + parameter0.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Jsonb; + // .Net Class Type + var parameter1 = parameterBuilder.AppendParameter(_document.GetType().FullName); + parameter1.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Varchar; + var parameter2 = parameterBuilder.AppendParameter(document.Id); + setCurrentRevisionParameter(parameterBuilder); + builder.Append(')'); + } + + } + + // END: UpdateRevisionedDocOperation1212098993 + + + // START: QueryOnlyRevisionedDocSelector1212098993 + public class QueryOnlyRevisionedDocSelector1212098993 : Marten.Internal.CodeGeneration.DocumentSelectorWithOnlySerializer, Marten.Linq.Selectors.ISelector + { + private readonly Marten.Internal.IMartenSession _session; + private readonly Marten.Schema.DocumentMapping _mapping; + + public QueryOnlyRevisionedDocSelector1212098993(Marten.Internal.IMartenSession session, Marten.Schema.DocumentMapping mapping) : base(session, mapping) + { + _session = session; + _mapping = mapping; + } + + + + public DocumentDbTests.Concurrency.RevisionedDoc Resolve(System.Data.Common.DbDataReader reader) + { + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = _serializer.FromJson(reader, 0); + var version = reader.GetFieldValue(1); + document.Version = version; + return document; + } + + + public async System.Threading.Tasks.Task ResolveAsync(System.Data.Common.DbDataReader reader, System.Threading.CancellationToken token) + { + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = await _serializer.FromJsonAsync(reader, 0, token).ConfigureAwait(false); + var version = await reader.GetFieldValueAsync(1, token); + document.Version = version; + return document; + } + + } + + // END: QueryOnlyRevisionedDocSelector1212098993 + + + // START: LightweightRevisionedDocSelector1212098993 + public class LightweightRevisionedDocSelector1212098993 : Marten.Internal.CodeGeneration.DocumentSelectorWithOnlySerializer, Marten.Linq.Selectors.ISelector + { + private readonly Marten.Internal.IMartenSession _session; + private readonly Marten.Schema.DocumentMapping _mapping; + + public LightweightRevisionedDocSelector1212098993(Marten.Internal.IMartenSession session, Marten.Schema.DocumentMapping mapping) : base(session, mapping) + { + _session = session; + _mapping = mapping; + } + + + + public DocumentDbTests.Concurrency.RevisionedDoc Resolve(System.Data.Common.DbDataReader reader) + { + var id = reader.GetFieldValue(0); + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = _serializer.FromJson(reader, 1); + var version = reader.GetFieldValue(2); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + return document; + } + + + public async System.Threading.Tasks.Task ResolveAsync(System.Data.Common.DbDataReader reader, System.Threading.CancellationToken token) + { + var id = await reader.GetFieldValueAsync(0, token); + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = await _serializer.FromJsonAsync(reader, 1, token).ConfigureAwait(false); + var version = await reader.GetFieldValueAsync(2, token); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + return document; + } + + } + + // END: LightweightRevisionedDocSelector1212098993 + + + // START: IdentityMapRevisionedDocSelector1212098993 + public class IdentityMapRevisionedDocSelector1212098993 : Marten.Internal.CodeGeneration.RevisionedDocumentSelectorWithIdentityMap, Marten.Linq.Selectors.ISelector + { + private readonly Marten.Internal.IMartenSession _session; + private readonly Marten.Schema.DocumentMapping _mapping; + + public IdentityMapRevisionedDocSelector1212098993(Marten.Internal.IMartenSession session, Marten.Schema.DocumentMapping mapping) : base(session, mapping) + { + _session = session; + _mapping = mapping; + } + + + + public DocumentDbTests.Concurrency.RevisionedDoc Resolve(System.Data.Common.DbDataReader reader) + { + var id = reader.GetFieldValue(0); + if (_identityMap.TryGetValue(id, out var existing)) return existing; + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = _serializer.FromJson(reader, 1); + var version = reader.GetFieldValue(2); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + _identityMap[id] = document; + return document; + } + + + public async System.Threading.Tasks.Task ResolveAsync(System.Data.Common.DbDataReader reader, System.Threading.CancellationToken token) + { + var id = await reader.GetFieldValueAsync(0, token); + if (_identityMap.TryGetValue(id, out var existing)) return existing; + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = await _serializer.FromJsonAsync(reader, 1, token).ConfigureAwait(false); + var version = await reader.GetFieldValueAsync(2, token); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + _identityMap[id] = document; + return document; + } + + } + + // END: IdentityMapRevisionedDocSelector1212098993 + + + // START: DirtyTrackingRevisionedDocSelector1212098993 + public class DirtyTrackingRevisionedDocSelector1212098993 : Marten.Internal.CodeGeneration.RevisionedDocumentSelectorWithDirtyChecking, Marten.Linq.Selectors.ISelector + { + private readonly Marten.Internal.IMartenSession _session; + private readonly Marten.Schema.DocumentMapping _mapping; + + public DirtyTrackingRevisionedDocSelector1212098993(Marten.Internal.IMartenSession session, Marten.Schema.DocumentMapping mapping) : base(session, mapping) + { + _session = session; + _mapping = mapping; + } + + + + public DocumentDbTests.Concurrency.RevisionedDoc Resolve(System.Data.Common.DbDataReader reader) + { + var id = reader.GetFieldValue(0); + if (_identityMap.TryGetValue(id, out var existing)) return existing; + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = _serializer.FromJson(reader, 1); + var version = reader.GetFieldValue(2); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + _identityMap[id] = document; + StoreTracker(_session, document); + return document; + } + + + public async System.Threading.Tasks.Task ResolveAsync(System.Data.Common.DbDataReader reader, System.Threading.CancellationToken token) + { + var id = await reader.GetFieldValueAsync(0, token); + if (_identityMap.TryGetValue(id, out var existing)) return existing; + + DocumentDbTests.Concurrency.RevisionedDoc document; + document = await _serializer.FromJsonAsync(reader, 1, token).ConfigureAwait(false); + var version = await reader.GetFieldValueAsync(2, token); + document.Version = version; + _session.MarkAsDocumentLoaded(id, document); + _identityMap[id] = document; + StoreTracker(_session, document); + return document; + } + + } + + // END: DirtyTrackingRevisionedDocSelector1212098993 + + + // START: OverwriteRevisionedDocOperation1212098993 + public class OverwriteRevisionedDocOperation1212098993 : Marten.Internal.Operations.StorageOperation + { + private readonly DocumentDbTests.Concurrency.RevisionedDoc _document; + private readonly System.Guid _id; + private readonly System.Collections.Generic.Dictionary _versions; + private readonly Marten.Schema.DocumentMapping _mapping; + + public OverwriteRevisionedDocOperation1212098993(DocumentDbTests.Concurrency.RevisionedDoc document, System.Guid id, System.Collections.Generic.Dictionary versions, Marten.Schema.DocumentMapping mapping) : base(document, id, versions, mapping) + { + _document = document; + _id = id; + _versions = versions; + _mapping = mapping; + } + + + + public override void Postprocess(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions) + { + if (postprocessRevision(reader, exceptions)) + { + _document.Version = Revision; + } + + } + + + public override async System.Threading.Tasks.Task PostprocessAsync(System.Data.Common.DbDataReader reader, System.Collections.Generic.IList exceptions, System.Threading.CancellationToken token) + { + if (await postprocessRevisionAsync(reader, exceptions, token)) + { + _document.Version = Revision; + } + + } + + + public override Marten.Internal.Operations.OperationRole Role() + { + return Marten.Internal.Operations.OperationRole.Update; + } + + + public override NpgsqlTypes.NpgsqlDbType DbType() + { + return NpgsqlTypes.NpgsqlDbType.Uuid; + } + + + public override void ConfigureParameters(Weasel.Postgresql.IGroupedParameterBuilder parameterBuilder, Weasel.Postgresql.ICommandBuilder builder, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session) + { + if (document.Version > 0 && Revision == 1) Revision = document.Version; + builder.Append("select numeric_revisioning.mt_overwrite_revisioneddoc("); + var parameter0 = parameterBuilder.AppendParameter(session.Serializer.ToJson(_document)); + parameter0.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Jsonb; + // .Net Class Type + var parameter1 = parameterBuilder.AppendParameter(_document.GetType().FullName); + parameter1.NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Varchar; + var parameter2 = parameterBuilder.AppendParameter(document.Id); + setCurrentRevisionParameter(parameterBuilder); + builder.Append(')'); + } + + } + + // END: OverwriteRevisionedDocOperation1212098993 + + + // START: QueryOnlyRevisionedDocDocumentStorage1212098993 + public class QueryOnlyRevisionedDocDocumentStorage1212098993 : Marten.Internal.Storage.QueryOnlyDocumentStorage + { + private readonly Marten.Schema.DocumentMapping _document; + + public QueryOnlyRevisionedDocDocumentStorage1212098993(Marten.Schema.DocumentMapping document) : base(document) + { + _document = document; + } + + + + public override System.Guid AssignIdentity(DocumentDbTests.Concurrency.RevisionedDoc document, string tenantId, Marten.Storage.IMartenDatabase database) + { + if (document.Id == Guid.Empty) _setter(document, Marten.Schema.Identity.CombGuidIdGeneration.NewGuid()); + return document.Id; + } + + + public override Marten.Internal.Operations.IStorageOperation Update(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpdateRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Insert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.InsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override Marten.Internal.Operations.IStorageOperation Upsert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Overwrite(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override System.Guid Identity(DocumentDbTests.Concurrency.RevisionedDoc document) + { + return document.Id; + } + + + public override Marten.Linq.Selectors.ISelector BuildSelector(Marten.Internal.IMartenSession session) + { + return new Marten.Generated.DocumentStorage.QueryOnlyRevisionedDocSelector1212098993(session, _document); + } + + + public override object RawIdentityValue(System.Guid id) + { + return id; + } + + + public override Npgsql.NpgsqlParameter BuildManyIdParameter(System.Guid[] ids) + { + return base.BuildManyIdParameter(ids); + } + + } + + // END: QueryOnlyRevisionedDocDocumentStorage1212098993 + + + // START: LightweightRevisionedDocDocumentStorage1212098993 + public class LightweightRevisionedDocDocumentStorage1212098993 : Marten.Internal.Storage.LightweightDocumentStorage + { + private readonly Marten.Schema.DocumentMapping _document; + + public LightweightRevisionedDocDocumentStorage1212098993(Marten.Schema.DocumentMapping document) : base(document) + { + _document = document; + } + + + + public override System.Guid AssignIdentity(DocumentDbTests.Concurrency.RevisionedDoc document, string tenantId, Marten.Storage.IMartenDatabase database) + { + if (document.Id == Guid.Empty) _setter(document, Marten.Schema.Identity.CombGuidIdGeneration.NewGuid()); + return document.Id; + } + + + public override Marten.Internal.Operations.IStorageOperation Update(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpdateRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Insert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.InsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override Marten.Internal.Operations.IStorageOperation Upsert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Overwrite(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override System.Guid Identity(DocumentDbTests.Concurrency.RevisionedDoc document) + { + return document.Id; + } + + + public override Marten.Linq.Selectors.ISelector BuildSelector(Marten.Internal.IMartenSession session) + { + return new Marten.Generated.DocumentStorage.LightweightRevisionedDocSelector1212098993(session, _document); + } + + + public override object RawIdentityValue(System.Guid id) + { + return id; + } + + + public override Npgsql.NpgsqlParameter BuildManyIdParameter(System.Guid[] ids) + { + return base.BuildManyIdParameter(ids); + } + + } + + // END: LightweightRevisionedDocDocumentStorage1212098993 + + + // START: IdentityMapRevisionedDocDocumentStorage1212098993 + public class IdentityMapRevisionedDocDocumentStorage1212098993 : Marten.Internal.Storage.IdentityMapDocumentStorage + { + private readonly Marten.Schema.DocumentMapping _document; + + public IdentityMapRevisionedDocDocumentStorage1212098993(Marten.Schema.DocumentMapping document) : base(document) + { + _document = document; + } + + + + public override System.Guid AssignIdentity(DocumentDbTests.Concurrency.RevisionedDoc document, string tenantId, Marten.Storage.IMartenDatabase database) + { + if (document.Id == Guid.Empty) _setter(document, Marten.Schema.Identity.CombGuidIdGeneration.NewGuid()); + return document.Id; + } + + + public override Marten.Internal.Operations.IStorageOperation Update(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpdateRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Insert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.InsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override Marten.Internal.Operations.IStorageOperation Upsert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Overwrite(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override System.Guid Identity(DocumentDbTests.Concurrency.RevisionedDoc document) + { + return document.Id; + } + + + public override Marten.Linq.Selectors.ISelector BuildSelector(Marten.Internal.IMartenSession session) + { + return new Marten.Generated.DocumentStorage.IdentityMapRevisionedDocSelector1212098993(session, _document); + } + + + public override object RawIdentityValue(System.Guid id) + { + return id; + } + + + public override Npgsql.NpgsqlParameter BuildManyIdParameter(System.Guid[] ids) + { + return base.BuildManyIdParameter(ids); + } + + } + + // END: IdentityMapRevisionedDocDocumentStorage1212098993 + + + // START: DirtyTrackingRevisionedDocDocumentStorage1212098993 + public class DirtyTrackingRevisionedDocDocumentStorage1212098993 : Marten.Internal.Storage.DirtyCheckedDocumentStorage + { + private readonly Marten.Schema.DocumentMapping _document; + + public DirtyTrackingRevisionedDocDocumentStorage1212098993(Marten.Schema.DocumentMapping document) : base(document) + { + _document = document; + } + + + + public override System.Guid AssignIdentity(DocumentDbTests.Concurrency.RevisionedDoc document, string tenantId, Marten.Storage.IMartenDatabase database) + { + if (document.Id == Guid.Empty) _setter(document, Marten.Schema.Identity.CombGuidIdGeneration.NewGuid()); + return document.Id; + } + + + public override Marten.Internal.Operations.IStorageOperation Update(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpdateRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Insert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.InsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override Marten.Internal.Operations.IStorageOperation Upsert(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + if (session.Concurrency == Marten.Services.ConcurrencyChecks.Disabled) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + else + { + + return new Marten.Generated.DocumentStorage.UpsertRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + } + + + public override Marten.Internal.Operations.IStorageOperation Overwrite(DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Internal.IMartenSession session, string tenant) + { + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + + return new Marten.Generated.DocumentStorage.OverwriteRevisionedDocOperation1212098993 + ( + document, Identity(document), + null, + _document + + ); + } + + + public override System.Guid Identity(DocumentDbTests.Concurrency.RevisionedDoc document) + { + return document.Id; + } + + + public override Marten.Linq.Selectors.ISelector BuildSelector(Marten.Internal.IMartenSession session) + { + return new Marten.Generated.DocumentStorage.DirtyTrackingRevisionedDocSelector1212098993(session, _document); + } + + + public override object RawIdentityValue(System.Guid id) + { + return id; + } + + + public override Npgsql.NpgsqlParameter BuildManyIdParameter(System.Guid[] ids) + { + return base.BuildManyIdParameter(ids); + } + + } + + // END: DirtyTrackingRevisionedDocDocumentStorage1212098993 + + + // START: RevisionedDocBulkLoader1212098993 + public class RevisionedDocBulkLoader1212098993 : Marten.Internal.CodeGeneration.BulkLoader + { + private readonly Marten.Internal.Storage.IDocumentStorage _storage; + + public RevisionedDocBulkLoader1212098993(Marten.Internal.Storage.IDocumentStorage storage) : base(storage) + { + _storage = storage; + } + + + public const string MAIN_LOADER_SQL = "COPY numeric_revisioning.mt_doc_revisioneddoc(\"mt_dotnet_type\", \"id\", \"mt_version\", \"data\") FROM STDIN BINARY"; + + public const string TEMP_LOADER_SQL = "COPY mt_doc_revisioneddoc_temp(\"mt_dotnet_type\", \"id\", \"mt_version\", \"data\") FROM STDIN BINARY"; + + public const string COPY_NEW_DOCUMENTS_SQL = "insert into numeric_revisioning.mt_doc_revisioneddoc (\"id\", \"data\", \"mt_dotnet_type\", \"mt_version\", mt_last_modified) (select mt_doc_revisioneddoc_temp.\"id\", mt_doc_revisioneddoc_temp.\"data\", mt_doc_revisioneddoc_temp.\"mt_dotnet_type\", mt_doc_revisioneddoc_temp.\"mt_version\", transaction_timestamp() from mt_doc_revisioneddoc_temp left join numeric_revisioning.mt_doc_revisioneddoc on mt_doc_revisioneddoc_temp.id = numeric_revisioning.mt_doc_revisioneddoc.id where numeric_revisioning.mt_doc_revisioneddoc.id is null)"; + + public const string OVERWRITE_SQL = "update numeric_revisioning.mt_doc_revisioneddoc target SET data = source.data, mt_dotnet_type = source.mt_dotnet_type, mt_version = source.mt_version, mt_last_modified = transaction_timestamp() FROM mt_doc_revisioneddoc_temp source WHERE source.id = target.id"; + + public const string CREATE_TEMP_TABLE_FOR_COPYING_SQL = "create temporary table mt_doc_revisioneddoc_temp (like numeric_revisioning.mt_doc_revisioneddoc including defaults)"; + + + public override string CreateTempTableForCopying() + { + return CREATE_TEMP_TABLE_FOR_COPYING_SQL; + } + + + public override string CopyNewDocumentsFromTempTable() + { + return COPY_NEW_DOCUMENTS_SQL; + } + + + public override string OverwriteDuplicatesFromTempTable() + { + return OVERWRITE_SQL; + } + + + public override void LoadRow(Npgsql.NpgsqlBinaryImporter writer, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Storage.Tenant tenant, Marten.ISerializer serializer) + { + writer.Write(document.GetType().FullName, NpgsqlTypes.NpgsqlDbType.Varchar); + writer.Write(document.Id, NpgsqlTypes.NpgsqlDbType.Uuid); + writer.Write(1, NpgsqlTypes.NpgsqlDbType.Integer); + writer.Write(serializer.ToJson(document), NpgsqlTypes.NpgsqlDbType.Jsonb); + } + + + public override async System.Threading.Tasks.Task LoadRowAsync(Npgsql.NpgsqlBinaryImporter writer, DocumentDbTests.Concurrency.RevisionedDoc document, Marten.Storage.Tenant tenant, Marten.ISerializer serializer, System.Threading.CancellationToken cancellation) + { + await writer.WriteAsync(document.GetType().FullName, NpgsqlTypes.NpgsqlDbType.Varchar, cancellation); + await writer.WriteAsync(document.Id, NpgsqlTypes.NpgsqlDbType.Uuid, cancellation); + await writer.WriteAsync(1, NpgsqlTypes.NpgsqlDbType.Integer, cancellation); + await writer.WriteAsync(serializer.ToJson(document), NpgsqlTypes.NpgsqlDbType.Jsonb, cancellation); + } + + + public override string MainLoaderSql() + { + return MAIN_LOADER_SQL; + } + + + public override string TempLoaderSql() + { + return TEMP_LOADER_SQL; + } + + } + + // END: RevisionedDocBulkLoader1212098993 + + + // START: RevisionedDocProvider1212098993 + public class RevisionedDocProvider1212098993 : Marten.Internal.Storage.DocumentProvider + { + private readonly Marten.Schema.DocumentMapping _mapping; + + public RevisionedDocProvider1212098993(Marten.Schema.DocumentMapping mapping) : base(new RevisionedDocBulkLoader1212098993(new QueryOnlyRevisionedDocDocumentStorage1212098993(mapping)), new QueryOnlyRevisionedDocDocumentStorage1212098993(mapping), new LightweightRevisionedDocDocumentStorage1212098993(mapping), new IdentityMapRevisionedDocDocumentStorage1212098993(mapping), new DirtyTrackingRevisionedDocDocumentStorage1212098993(mapping)) + { + _mapping = mapping; + } + + + } + + // END: RevisionedDocProvider1212098993 + + +} + diff --git a/src/EventAppenderPerfTester/Program.cs b/src/EventAppenderPerfTester/Program.cs index 68f5cdd567..a3da553161 100644 --- a/src/EventAppenderPerfTester/Program.cs +++ b/src/EventAppenderPerfTester/Program.cs @@ -18,7 +18,7 @@ opts.DisableNpgsqlLogging = true; opts.Events.AppendMode = EventAppendMode.Quick; - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Add(ProjectionLifecycle.Inline); }); diff --git a/src/EventSourcingTests/AssemblyInfo.cs b/src/EventSourcingTests/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/EventSourcingTests/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index d07c2d0b2d..9a9cdff7d7 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -134,7 +134,7 @@ public void switch_to_quick_and_back_to_rich() [Fact] public void use_identity_map_for_inline_aggregates_is_false_by_default() { - theGraph.UseIdentityMapForInlineAggregates.ShouldBeFalse(); + theGraph.UseIdentityMapForAggregates.ShouldBeFalse(); } public class HouseRemodeling diff --git a/src/EventSourcingTests/Examples/FetchLatest.cs b/src/EventSourcingTests/Examples/FetchLatest.cs new file mode 100644 index 0000000000..3ec9fd921a --- /dev/null +++ b/src/EventSourcingTests/Examples/FetchLatest.cs @@ -0,0 +1,160 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Projections; +using Marten; +using Marten.Events.Daemon.Resiliency; +using Marten.Events.Projections; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + +namespace EventSourcingTests.Examples; + +public static class FetchLatest +{ + public static void configure_live() + { + #region sample_configure_aggregate_as_live + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + // Just telling Marten upfront that we will use + // live aggregation for the Invoice aggregate + // This would be the default anyway if you didn't explicitly + // register Invoice this way, but doing so let's + // Marten "know" about Invoice for code generation + opts.Projections.LiveStreamAggregation(); + }); + + #endregion + } + + #region sample_read_live_invoice + + public static async Task read_live_invoice( + IQuerySession session, + Guid invoiceId) + { + var invoice = await session + .Events.AggregateStreamAsync(invoiceId); + } + + #endregion + + public static void configure_inline() + { + #region sample_configure_aggregate_as_inline + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + #endregion + } + + #region sample_read_inline_invoice + + public static async Task read_inline_invoice( + IQuerySession session, + Guid invoiceId) + { + var invoice = await session + .LoadAsync(invoiceId); + } + + #endregion + + public static void configure_async() + { + #region sample_configure_aggregate_as_async + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }) + .AddAsyncDaemon(DaemonMode.HotCold); + + #endregion + } + + #region sample_read_latest_invoice + + public static async Task read_latest( + // Watch this, only available on the full IDocumentSession + IDocumentSession session, + Guid invoiceId) + { + var invoice = await session + .Events.FetchLatest(invoiceId); + } + + #endregion + + public static void configure_with_optimizations() + { + #region sample_configure_aggregate_with_optimizations + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + // This opts Marten into a pretty big optimization + // for how FetchForWriting and/or FetchLatest work internally + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + #endregion + } + + #region sample_invoice_approval_workflow_with_mutate + + public static Task Approve(IDocumentSession session, Guid invoiceId) + { + return session.Mutate(invoiceId, invoice => + { + if (invoice.Status != InvoiceStatus.Approved) + { + return new object[] { new InvoiceApproved() }; + } + + return Array.Empty(); + }); + } + + #endregion +} + +#region sample_mutation_extensions + +public static class MutationExtensions +{ + public static async Task Mutate(this IDocumentSession session, Guid id, Func decider, + CancellationToken token = default) where T : class + { + var stream = await session.Events.FetchForWriting(id, token); + + // Decide what new events should be appended based on the current + // state of the aggregate and application logic + var events = decider(stream.Aggregate); + stream.AppendMany(events); + + // Persist any new events + await session.SaveChangesAsync(token); + + return await session.Events.FetchLatest(id, token); + } +} + +#endregion diff --git a/src/EventSourcingTests/Examples/Optimizations.cs b/src/EventSourcingTests/Examples/Optimizations.cs index 48b9ebc30b..bcd5873c82 100644 --- a/src/EventSourcingTests/Examples/Optimizations.cs +++ b/src/EventSourcingTests/Examples/Optimizations.cs @@ -45,10 +45,9 @@ public static async Task use_optimizations() opts.Events.AppendMode = EventAppendMode.Quick; // Little more involved, but this can reduce the number - // of database queries necessary to process inline projections - // during command handling with some significant - // caveats - opts.Events.UseIdentityMapForInlineAggregates = true; + // of database queries necessary to process projections + // during CQRS command handling with certain workflows + opts.Events.UseIdentityMapForAggregates = true; // Opts into a mode where Marten is able to rebuild single // stream projections faster by building one stream at a time diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs index c8b7ec4b03..b38facd013 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs @@ -58,10 +58,33 @@ public async Task from_no_current_activity_string_centric() } [Fact] - public async Task from_after_fetch_for_writing_guid_centric_brand_new() + public async Task from_after_fetch_for_writing_guid_centric_brand_new_1() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var aggregate = await theSession.Events.FetchLatest(streamId); + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization() + { + StoreOptions(opts => + { + //opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Snapshot(SnapshotLifecycle.Async); }); diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs index b1deac6c0e..2cf9092dbf 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs @@ -16,6 +16,7 @@ public async Task from_no_current_activity_guid_centric() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); @@ -62,6 +63,7 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); @@ -84,6 +86,7 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_existing() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); @@ -98,8 +101,7 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_existing() stream.AppendMany(new DEvent(), new DEvent()); await session.SaveChangesAsync(); - await using var query = theStore.LightweightSession(); - var document = await query.Events.FetchLatest(streamId); + var document = await session.Events.FetchLatest(streamId); document.ACount.ShouldBe(1); document.BCount.ShouldBe(2); @@ -134,6 +136,7 @@ public async Task from_after_fetch_for_writing_string_centric_existing() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; opts.Events.StreamIdentity = StreamIdentity.AsString; opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs index 7e89a08dc4..6c28410b85 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs @@ -52,7 +52,7 @@ public async Task from_no_current_activity_string_centric() } [Fact] - public async Task from_after_fetch_for_writing_guid_centric_brand_new() + public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization() { var streamId = Guid.NewGuid(); @@ -67,9 +67,27 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new() aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_new_with_optimization() + { + StoreOptions(opts => opts.Events.UseIdentityMapForAggregates = true); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var aggregate = await theSession.Events.FetchLatest(streamId); + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] - public async Task from_after_fetch_for_writing_guid_centric_brand_existing() + public async Task from_after_fetch_for_writing_guid_centric_brand_existing_no_optimization() { var streamId = Guid.NewGuid(); theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), @@ -91,6 +109,30 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_existing() document.DCount.ShouldBe(2); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_existing_with_optimization() + { + StoreOptions(opts => opts.Events.UseIdentityMapForAggregates = true); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + using var session = theStore.LightweightSession(); + var stream = await session.Events.FetchForWriting(streamId); + stream.AppendMany(new DEvent(), new DEvent()); + await session.SaveChangesAsync(); + + var document = await session.Events.FetchLatest(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + document.DCount.ShouldBe(2); + } + [Fact] public async Task from_after_fetch_for_writing_string_centric_brand_new() { diff --git a/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs b/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs index 78c04d3759..d1135569ea 100644 --- a/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs +++ b/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs @@ -55,7 +55,7 @@ public async Task revision_is_updated_after_quick_appending_with_IRevisioned() { opts.Projections.Snapshot(SnapshotLifecycle.Inline); opts.Events.AppendMode = EventAppendMode.Quick; - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; }); var streamId = Guid.NewGuid(); @@ -85,7 +85,7 @@ public async Task revision_is_updated_after_quick_appending_with_custom_mapped_v }); opts.Events.AppendMode = EventAppendMode.Quick; - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; }); var streamId = Guid.NewGuid(); @@ -541,7 +541,7 @@ public static void using_identity_map_for_inline_aggregates() // an Inline projection for the "T". Saves on Marten doing an extra // database fetch of the same data you already fetched from FetchForWriting() // when Marten needs to apply the Inline projection as part of SaveChanges() - opts.Events.UseIdentityMapForInlineAggregates = true; + opts.Events.UseIdentityMapForAggregates = true; }) // This is non-trivial performance optimization if you never // need identity map mechanics in your commands or query handlers @@ -555,8 +555,8 @@ public async Task silently_turns_on_identity_map_for_inline_aggregates() { StoreOptions(opts => { + opts.Events.UseIdentityMapForAggregates = true; opts.Projections.Snapshot(SnapshotLifecycle.Inline); - opts.Events.UseIdentityMapForInlineAggregates = true; }); var streamId = Guid.NewGuid(); diff --git a/src/LinqTests/AssemblyInfo.cs b/src/LinqTests/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/LinqTests/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/Marten.AspNetCore.Testing/AssemblyInfo.cs b/src/Marten.AspNetCore.Testing/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/Marten.AspNetCore.Testing/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/Marten.PLv8.Testing/AssemblyInfo.cs b/src/Marten.PLv8.Testing/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/Marten.PLv8.Testing/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/Marten.Testing/AssemblyInfo.cs b/src/Marten.Testing/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/Marten.Testing/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index c68f841643..65449e73bd 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -18,13 +18,12 @@ using Marten.Sessions; using Marten.Storage; using Marten.Util; -using Microsoft.Extensions.Options; using Npgsql; namespace Marten.Events.Aggregation; public abstract class CrossStreamAggregationRuntime: AggregationRuntime - where TDoc : notnull where TId : notnull + where TDoc : class where TId : notnull { public CrossStreamAggregationRuntime(IDocumentStore store, IAggregateProjection projection, IEventSlicer slicer, IDocumentStorage storage): base(store, projection, slicer, storage) @@ -43,7 +42,7 @@ public override bool IsNew(EventSlice slice) /// /// public abstract class AggregationRuntime: IAggregationRuntime - where TDoc : notnull where TId : notnull + where TDoc : class where TId : notnull { private readonly Func _identitySource; @@ -108,7 +107,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, // do not load if sliced by stream and the stream does not yet exist if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline && (Slicer is not ISingleStreamSlicer || slice.ActionType != StreamActionType.Start)) { - if (session.Options.Events.UseIdentityMapForInlineAggregates) + if (session.Options.Events.UseIdentityMapForAggregates) { // It's actually important to go in through the front door and use the session so that // the identity map can kick in here @@ -164,6 +163,12 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, return; } + // TODO -- make this method virtual so we don't have to know about what document session is + if (session is not ProjectionDocumentSession && Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(slice.Id, aggregate); + } + var storageOperation = Storage.Upsert(aggregate, session, slice.Tenant.TenantId); if (Slicer is ISingleStreamSlicer && lastEvent != null && storageOperation is IRevisionedOperation op) { diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 340b6b8666..c5adec2a6c 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -132,7 +132,21 @@ public StreamIdentity StreamIdentity public TenancyStyle TenancyStyle { get; set; } = TenancyStyle.Single; public bool EnableGlobalProjectionsForConjoinedTenancy { get; set; } - public bool UseIdentityMapForInlineAggregates { get; set; } + + [Obsolete("Will be removed in Marten 8")] + public bool UseIdentityMapForInlineAggregates + { + get + { + return UseIdentityMapForAggregates; + } + set + { + UseIdentityMapForAggregates = value; + } + } + + public bool UseIdentityMapForAggregates { get; set; } /// /// Configure the meta data required to be stored for events. By default meta data fields are disabled diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.cs index 847b24b0bb..1e3e6a8847 100644 --- a/src/Marten/Events/Fetching/FetchAsyncPlan.cs +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.cs @@ -143,9 +143,17 @@ public async Task> FetchForWriting(DocumentSessionBase sessio _storage.SetIdentity(document, id); } - return version == 0 + var stream = version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; } catch (Exception e) { @@ -234,9 +242,17 @@ public async Task> FetchForWriting(DocumentSessionBase sessio _storage.SetIdentity(document, id); } - return version == 0 + var stream = version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; } catch (Exception e) { @@ -256,6 +272,18 @@ public async Task> FetchForWriting(DocumentSessionBase sessio public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) { + // Optimization for having called FetchForWriting, then FetchLatest on same session in short order + if (session.Options.Events.UseIdentityMapForAggregates) + { + if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) + { + var starting = stream.Aggregate; + var appendedEvents = stream.Events; + + return await _aggregator.BuildAsync(appendedEvents, session, starting, cancellation).ConfigureAwait(false); + } + } + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.cs index f103e4bd8c..bc14811752 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core.Reflection; using Marten.Exceptions; using Marten.Internal.Sessions; using Marten.Internal.Storage; @@ -25,7 +27,7 @@ public async Task> FetchForWriting(DocumentSessionBase sessio CancellationToken cancellation = default) { IDocumentStorage storage = null; - if (session.Options.Events.UseIdentityMapForInlineAggregates) + if (session.Options.Events.UseIdentityMapForAggregates) { storage = session.Options.ResolveCorrectedDocumentStorage(DocumentTracking.IdentityOnly); // Opt into the identity map mechanics for this aggregate type just in case @@ -66,6 +68,12 @@ public async Task> FetchForWriting(DocumentSessionBase sessio await reader.NextResultAsync(cancellation).ConfigureAwait(false); var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + // As an optimization, put the document in the identity map for later + if (document != null && session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, document); + } + return version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); @@ -90,7 +98,7 @@ public async Task> FetchForWriting(DocumentSessionBase sessio long expectedStartingVersion, CancellationToken cancellation = default) { IDocumentStorage storage = null; - if (session.Options.Events.UseIdentityMapForInlineAggregates) + if (session.Options.Events.UseIdentityMapForAggregates) { storage = (IDocumentStorage)session.Options.Providers.StorageFor(); // Opt into the identity map mechanics for this aggregate type just in case @@ -134,6 +142,12 @@ public async Task> FetchForWriting(DocumentSessionBase sessio await reader.NextResultAsync(cancellation).ConfigureAwait(false); var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + // As an optimization, put the document in the identity map for later + if (document != null && session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, document); + } + return version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); @@ -156,12 +170,10 @@ public async Task> FetchForWriting(DocumentSessionBase sessio public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) { - // TODO -- optimizations coming - IDocumentStorage storage = null; - if (session.Options.Events.UseIdentityMapForInlineAggregates) + if (session.Options.Events.UseIdentityMapForAggregates) { - storage = (IDocumentStorage)session.Options.Providers.StorageFor(); + storage = (IDocumentStorage)session.Options.Providers.StorageFor().IdentityMap; // Opt into the identity map mechanics for this aggregate type just in case // you're using a lightweight session session.UseIdentityMapFor(); @@ -171,6 +183,12 @@ public async ValueTask FetchForReading(DocumentSessionBase session, TId id storage = session.StorageFor(); } + // Opting into optimizations here + if (session.TryGetAggregateFromIdentityMap(id, out var doc)) + { + return doc; + } + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); var builder = new BatchBuilder { TenantId = session.TenantId }; diff --git a/src/Marten/Events/Fetching/FetchLivePlan.cs b/src/Marten/Events/Fetching/FetchLivePlan.cs index d0df62b687..8bf7e69ee0 100644 --- a/src/Marten/Events/Fetching/FetchLivePlan.cs +++ b/src/Marten/Events/Fetching/FetchLivePlan.cs @@ -63,9 +63,17 @@ public async Task> FetchForWriting(DocumentSessionBase sessio _documentStorage.SetIdentity(document, id); } - return version == 0 + var stream = version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; } catch (Exception e) { @@ -120,10 +128,17 @@ public async Task> FetchForWriting(DocumentSessionBase sessio var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); var document = await _aggregator.BuildAsync(events, session, default, cancellation).ConfigureAwait(false); - - return version == 0 + var stream = version == 0 ? _identityStrategy.StartStream(document, session, id, cancellation) : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; } catch (Exception e) { @@ -143,7 +158,18 @@ public async Task> FetchForWriting(DocumentSessionBase sessio public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) { - // TODO -- there will be optimizations later!!! + // Optimization for having called FetchForWriting, then FetchLatest on same session in short order + if (session.Options.Events.UseIdentityMapForAggregates) + { + if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) + { + var starting = stream.Aggregate; + var appendedEvents = stream.Events; + + return await _aggregator.BuildAsync(appendedEvents, session, starting, cancellation).ConfigureAwait(false); + } + } + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) .ConfigureAwait(false); diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index 6546b90935..07440546c3 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -36,7 +36,26 @@ public interface IEventStoreOptions /// Inline single stream projection's aggregate type when FetchForWriting() is called. Default is false. /// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten /// - bool UseIdentityMapForInlineAggregates { get; set; } + [Obsolete("Use the UseIdentityMapForAggregates property instead")] + bool UseIdentityMapForInlineAggregates + { + get + { + return UseIdentityMapForAggregates; + } + set + { + UseIdentityMapForAggregates = value; + } + } + + /// + /// Opt into a performance optimization that directs Marten to always use the identity map for an + /// as much as possible for FetchForWriting() or FetchLatest(). Note that this optimization is only + /// appropriate if using either immutable aggregations or when you do not mutate the aggregate yourself + /// outside of Marten internals + /// + bool UseIdentityMapForAggregates { get; set; } /// /// Override the database schema name for event related tables. By default this diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index ae03d3eb8d..597ffdbd6a 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -8,6 +8,7 @@ using Marten.Events; using Marten.Events.Aggregation; using Marten.Events.Daemon.Internals; +using Marten.Events.Daemon.Resiliency; using Marten.Exceptions; using Marten.Internal.Operations; using Marten.Internal.Storage; @@ -338,7 +339,6 @@ protected internal virtual void resetDirtyChecking() // Nothing } - private void store(IEnumerable entities) where T : notnull { assertNotDisposed(); @@ -430,4 +430,39 @@ public void Execute(IDocumentSession session, IEnumerable objects) foreach (var document in objects.OfType()) session.Delete(document); } } + + internal void StoreDocumentInItemMap(TId id, TDoc document) where TDoc : class + { + if (ItemMap.ContainsKey(typeof(TDoc))) + { + ItemMap[typeof(TDoc)].As>()[id] = document; + } + else + { + var dict = new Dictionary(); + dict[id] = document; + ItemMap[typeof(TDoc)] = dict; + } + } + + internal bool TryGetAggregateFromIdentityMap(TId id, out TDoc document) + { + if (Options.EventGraph.UseIdentityMapForAggregates) + { + if (ItemMap.TryGetValue(typeof(TDoc), out var raw)) + { + if (raw is Dictionary dict) + { + if (dict.TryGetValue(id, out var doc)) + { + document = doc; + return true; + } + } + } + } + + document = default; + return false; + } } diff --git a/src/ValueTypeTests/AssemblyInfo.cs b/src/ValueTypeTests/AssemblyInfo.cs new file mode 100644 index 0000000000..c24077a113 --- /dev/null +++ b/src/ValueTypeTests/AssemblyInfo.cs @@ -0,0 +1,2 @@ +using Xunit; +[assembly: CollectionBehavior(DisableTestParallelization = true)] \ No newline at end of file