Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AggregateId disconnection between aggregate and state #75

Closed
thomaseyde opened this issue Apr 2, 2022 · 25 comments
Closed

AggregateId disconnection between aggregate and state #75

thomaseyde opened this issue Apr 2, 2022 · 25 comments

Comments

@thomaseyde
Copy link

This trips me several times, and smells duplication:

  1. I have to compute the aggregate id in application service handlers,
  2. Then again in the When method in aggregate state

Also, if I don't set the Id explicit on my state object, at least on the very first event, then my aggregate has no id.

I can see why the service need me to compute the id from incoming command, but the id is known at the time When runs. Which means it's known to the aggregate. Is it really necessary to keep the id in the aggregate state?

@alexeyzimarev
Copy link
Contributor

It's a good idea to do it so you can create behaviour tests based only on events, without involving the command service.

@thomaseyde
Copy link
Author

But is it really necessary to keep the aggregate id in the state object?

If it is, why can’t the aggregate set it for us?

If that’s not possible, the library should throw a very obvious exception which tells us explicitly what we forgot to do.

@alexeyzimarev
Copy link
Contributor

When we persist an object somewhere, we always have the id. Say, I want to store the state using Entity Framework, where does the identity come from? In my world, it's a part of the state.

And, if it's a part of the state, why should it be treated differently than any other property of the state? I'd say it should be possible to set the state id on instantiating/loading, the question is if it is consistent behaviour.

I need to dig a bit more to see how the issue can be prevented.

@bartelink
Copy link
Contributor

bartelink commented Apr 3, 2022

Say, I want to store the state using Entity Framework

Careful on those analogies!

As you've seen me regularly mention in other places, I'm on the side of explicit mapping from inputs to a stream identifier. i.e. there might be a tenantid, an explicit aggregate id and an extra sharding context key that all feed into where it's going to live. Not all of those are even relevant to the aggregate.

If you're going to look stuff up in a cache, this routing info/context might feed into that too.

Within the store you might not keep all of those things as it does not make sense.

Within the aggregate state, it also rarely makes sense either.

I find the primary reason people end up embedding this envelope/routing context into the state is to enable code within the logic to use those values in generating events. i.e. Created bears the id, which goes into the state, which is then used to include the Id in the Updated. (I'm not a huge fan of the redundant inclusion of id information in events either, but that should be a separate matter)

So I would say that

  • you have a set of inputs which might be a single thing, but can also be tuples or records etc.
  • a selection of those feed into how you generate a streamname (e.g. I might redundantly include a Fulfilment Center Id even if I know that an OrderId is globally unique, but the tenantid has driven which database I am working in)
  • I do not want to have to track the orderid or the FC in my fold state
  • when I generate events I might need some context like the tenantid (in addition to the orderid and the fulfilment center id which I might choose to keep)

For me, the implications are:

  1. having a canonical id management mechanism coupled into things is not possible in the general case
  2. letting it leak into the state is also not helpful

My conclusion was to:

  1. as a first step, resolve an aggregate based on those id ingredients - that can decide the db, shard and streamname
  2. have the state/folding/caching not be aware of ids or stream names
  3. if you need to include context from step 1 when generating new events, provide that some other way (e.g. you could include it in the command, or as an extra arg)

ASIDE, some more things that this overlaps with.... In my world, the loading phase:

and the sync phase:

  • requires the streamname, the token and the proposed events
  • might end up saying there is a conflict (which would mean resyncing with some of the initial loaded context in the token being relevant)

Feeding contextual things like the etag into the state does not help matters.

@alexeyzimarev
Copy link
Contributor

I'm not a huge fan of the redundant inclusion of id information in events either, but that should be a separate matter

How would you route events in projections if you don't have the id? It could be possible to use the stream name for that purpose, but if you use some middleware between the event store and the projector, this information is lost, unless it's implicitly placed in the meta.

@bartelink
Copy link
Contributor

How would you route events in projections if you don't have the id? It could be possible to use the stream name for that purpose, but if you use some middleware between the event store and the projector, this information is lost, unless it's implicitly placed in the meta.

That's a separate concern; I agree it needs to pass somewhere, and there is no single perfect solution (I tend to use stream name parsing a lot for routing of internal reactions processing, but that may or not work and/or be a clean solution in a given context).

If we imagine that we have a tenantid, an FC id and the actual strictly absolutely necessary id they might come from any mix of:

  • reader metadata (tenantid might be inferred based on the container name in my doc store)
  • the event body itself (if I want to get into parsing it to achieve that)
  • the metadata of the event
  • the stream name, as you mention.

In other words, I'd say that mapping from an event being projected to an aggregate is in a similar way a thing that should not be baked in and/or coupled to any one specific thing.

In idiomatic Equinox+Propulsion code, this is represented by having a streamName function which maps from e.g. (tenantId, fcId, orderId) -> StreamName (string), and a parse function that can take Order-FC001_{orderId} and tell you that this event pertains to an Order (and/or you might be taking out the FC001 to map it to a database for that 'tenant'. There is no intrinsic big win to it being in the event body (even if that's not a reason to include it from a modelling perspective or a pedagogical perspective).

The important thing is that neither Equinox or Propulsion demand you to provide such a function and/or expect there to be some magic id, marker interface or other conventions - the mapping lives in your domain and/or composition root.

Ultimately you can often do stuff like:

  1. start off without a tenantid
  2. define all event schemas to have only the order id
  3. realize you want/need to introduce a tenantid

Now you have a decision as to whether to rewrite all events to put in the default tenantid as you said all events must be able to provide all context (and have to touch lots of code to thread through and/or default the tenantid), or you can not play that game and consider it to be part of the routing.

(Can go on even more tangents about how for me routing or reactions/projections should be based on envelope and/or stream name or the event metadata more than stuff inside the core event data, but my main goal was to convey that I think baking an id into the state is an antipattern as it does not generalize well in more complex cases, regardless of the fact that managing that it is clearly a central concern that needs to be catered for in some manner.)

@alexeyzimarev
Copy link
Contributor

realize you want/need to introduce a tenantid

that's what I mentioned in another issue, I got the same thought when discussing the shared store scenario. Not sure already where to find it, but apart from the tenant id, I can see the following.

Event type conflict:

  • one context
  • one store
  • two aggregates
  • same event type

It's not an issue when loading the aggregate, but it will be a problem for deserializing events in subscriptions.

Another scenario:

  • Two contexts
  • One store
  • Two aggregate types with the same name, one per context

Here comes the stream name conflict.

Of course, if we say "context is tenant" - we get to the same tenant id discussion. Still, the tenant id might be something else (customer id, deployment id, etc), so these can be even composed.

In EventStoreDB we discuss introducing namespace, tenant, and category as first-class discriminators. But, I'd like to keep those things abstracted out on the library level.

So, my current understanding that both event type mapping and StreamName<T> aren't good enough.

@bartelink
Copy link
Contributor

So, my current understanding that both event type mapping and StreamName aren't good enough.

Yeah, the main thing is obviously to bake in / couple to any specific approach. Sometimes the way to achieve that is to abstract it and provide an extension point, other times its a matter of finding a way to push a given aspect of the routing jsut far enough out that it becomes a non-issue from the point of view of your infra / library.

And so it begins...

(I find lots of uses for multiple aggregations/deciders per stream. While they inevitably share some event body contracts, the sets involved can vary. I don't have any great clean examples I can convey here, but that only matters if a lib is going to paint me into a corner by assuming a stream name or an aggregate i a single thing. Having event types not be contextual to an aggregate type makes debugging/versioning and reasoning way harder too (beyond very simple demo systems that).)

@alexeyzimarev
Copy link
Contributor

but that only matters if a lib is going to paint me into a corner by assuming a stream name or an aggregate is a single thing.

Well, there will never be a possibility to handle every possible scenario.

The way Eventuous develops is demand-based and mostly based on the demand of the sponsors and paid users (in any form, like working hours). As I am personally not convinced about things like multi-stream aggregates, or multi-aggregate streams (would be nice to have an example, as I struggle to understand this concept), I won't be doing that, so, someone else should :) Again, if it's a thing.

For this particular issue, I plan to make some extensions for configuring the stream name resolution. Maybe, also add some configurable defaults (i.e. static namespace). I also think about adding some default meta propagation for the aggregate store, based on such configuration that would be bound to the aggregate type. It will allow resolving things on the other side, and it will be quite generic.

@bartelink
Copy link
Contributor

Well, there will never be a possibility to handle every possible scenario.

My point here is that IMO the lib should not get involved; then it never constrains anything. Of course, abstracting around it is not without value, so doing that does have negatives too.

Cant think of a multiple streams scenario myself.

Most multi aggregate scenarios I have implemented are things like using a bespoke state format to allow efficient concurrent ingestion, while letting the bulk of other decisions use a different fold without having to merge the two. It is critical to reduce complexity in some cases, but obviously having two of anything is never without its costs either. (they ca be especially relevant where caching and/or snapshots are involved, which is another hint that it's definitely on the rare side).

What you are describing sounds good - my intention in putting my thoughts here is definitely only to suggests ways of looking at things - you're balancing a different set of forces and I definitely don't understand your overall choices enough to be concrete about any suggestion.

As you're well aware, of course the best way to design it for real is when a real use case in a system that's already employing the library makes it clear that it's worth expanding the surface area in a given direction...

@alexeyzimarev
Copy link
Contributor

Looking at the code now, it is easy to add a custom mapping, or even a custom stream name resolve function.

But, the only things that the store knows when reading the stream is the aggregate type and its id. Let's say we want something dynamic in the stream name as a tenant, and the tenant id is context-specific. So, we know the tenant id from the command execution context, but passing it over to the store Load function seems like overkill. Any thought?

@bartelink
Copy link
Contributor

bartelink commented Apr 4, 2022

The Tenant may decide aspects of:

  1. loading (might be in a different db)
  2. rendering generated events for storage (including metadata about it)

I have the application layer map from its inputs to a composed StreamName

The Decider is passed the Stream and does not have knowledge of the actual stream name - the Stream holds it

Any Load or Reload from a StoreCategory is supplied the StreamName.

From a user API perspective, it is only generated and stashed once - when Resolveing the Stream instance on the StoreCategory.


Some more detail:

In Equinox, there is a StoreCategory singleton, a Decider instance per call, and (under the covers) a Stream instance per call.

Command Handlers etc get passed a Func that resolves to a Decider:

type Service(resolve : TenantId * OrderId -> Decider<Events.Event, Fold.State>) =

    member _.Query(tenant, order, f : Fold.State -> T) : Async<T> =
        let decider = resolve (tenant, order)
        decider.Query(f)

    member _.Execute(tenant, order, command) : Async<T> =
        let decider = resolve (tenant, order)
        decider.Transact(decide command)

And there is some wiring over the top:

module Config =

    let private resolveStream = function
        | Config.Store.Memory store ->
            let cat = Config.Memory.create Events.codec Fold.initial Fold.fold store
            fun streamName -> cat.Resolve streamName
        | Config.Store.Cosmos (context, cache) ->
            let cat = Config.Cosmos.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
            fun streamName -> cat.Resolve streamName
    let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider
    let create = resolveDecider >> Service

The Decider:

  • talks to a Stream (which knows the composed stream name)
  • can ask it to Load (it does the rest, passing the stream name)
  • can ask it to Sync events to the store (including doing a reload/resync if there was a conflict)

The StoreCategory:

  • can make a Stream instance via Resolve (passing the StreamName, and an optional per-call Context)
  • knows the codec we want to use (and passes it the Context, e.g. if you want to put some tenant or auth info into the event metadata)

The streamName function lives up at the top of the aggregate impl:-

let [<Literal>] private Category = "Order"
let streamName = OrderId.toString >> FsCodec.StreamName.create Category

where OrderId is a lightweight wrapper over a string or a guid, and toString renders that in canonical form

@alexeyzimarev
Copy link
Contributor

Don't see how can I fit something like that to the current set of abstractions without breaking everything tbh.

@bartelink
Copy link
Contributor

bartelink commented Apr 4, 2022

You're probably right. I'd say the main point is to bear these sorts of things in mind as factors to use in deciding 50/50 calls as to where things should live. My high level heuristic for all these thing is TL;DR not in the state ;)

Off the top of my head, things that should not leak into the state unless they are relevant to actual decision making:

  • assumptions re count of events applied to the state implying a version (if you upconvert events and/or ignore legacy events)
  • versions (that can become etags and/or any other form of context for storage roundtrips) should be managed by the infra (and optionally made available so you can render the final version and/or session key when producing the final response after any WrongExpectedVersion conflicts have been resolved)
  • stuff that's used to generate/derive the stream name (pass that when resolving a stream)
  • stuff that you need to generate event metadata (pass that contextual stuff when getting the stream)

Obviously you'll have met the bulk of these and more in various guises. The thing that makes them not just side notes in Equinox is that any given operation against the store (Query / Transact) has all of these things resolveable, as you are never doing individual explicit Load vs Save vs Render events vs Render result phases, i.e.

  • StoreCategory has the codec
  • StoreCategory has Batch sizes for event queries when loading
  • StoreCategory has an isOrigin function to short circuit loading if a Reset/Clear event is encountered when loading events
  • StreamName was calculated at the start and is held on the Stream instance (and passed to the StoreCategory)
  • Context for rendering events is held on the Stream instance (to supply to the codec)
  • Individual calls can provide a loader hint (AllowState to permit using the cached value without validating no fresh events in the store since then on the initial decide loop, AssumeEmpty to assume the stream is empty)

Translating that directly to C# without a lot of juggling and refactoring just results in Func soup that nobody would wish on anyone!

@alexeyzimarev
Copy link
Contributor

I want to keep the id and version in the state. Everything else, as you said, should be elsewhere (meta, code).

Btw I like the idea of caching the state, but it would increase the complexity too much, considering by-default load-balanced applications (our prod). I think I have a better solution for it. I haven't started working on it, but we have a similar thing in prod (it is related to #1).

@bartelink
Copy link
Contributor

In Equinox, if you have a Cache, the Token is stashed alongside the state (as a tuple).

So for Cosmos, it holds an etag, ESDB it's the version

The StreamName is used as the cache key

In Cosmos, the cache is pretty much a no-brainer as there's a big difference between paying 1RU to validate your cache has the latest, vs paying 1 RU/K to re-read events or a snapshot (aside from the latency induced by redundant network traffic and deserialization time).

On Dynamo I also use an etag instead of a version to implement RollingState mode so again the Version is not relevant.

Another reason not to put the token in the state and/or require it to be a Version is that in Cosmos and Dynamo, instead of and/or in addition to emitting a Version number, one may wish may wish to emit the Session Token in a response, which lets you manage reading your writes with session consistency (without having to talk to the write leader all the time) - Having a Version alongside the state would not help with that (some notes in jet/equinox#192)

Exposing the Version (which needs to stay in sync with the state) alongside the SessionKey (which can change even if the state for a given stream has not) makes sense in that scenario.

I realize you and the the bulk of the planet don't care about this (I have also not fully implemented it, see jet/equinox#195), but the point here is that for me there are multiple reasons not to put the Version in/alongside the state in the aggregate

  • Its not always a Version (and it can vary by store)
  • The domain should rarely need to understand/depend on versions numbers as a general principle

I've road tested this a lot and am confident it works and is a better solution in practical terms in the context of Equinox and F# - definitely not saying that its the best choice in all contexts.

@alexeyzimarev
Copy link
Contributor

I merged #78 and realised that when the stream name is provided by the app service, there's no knowledge about the aggregate id in the app service.

Here is the service from a test:

        OnNew<Commands.BookRoom>(
            cmd => GetStreamName(cmd.BookingId),
            (booking, cmd)
                => booking.BookRoom(
                    new BookingId(cmd.BookingId),
                    cmd.RoomId,
                    new StayPeriod(cmd.CheckIn, cmd.CheckOut),
                    cmd.Price
                )
        );

Here, the app service has no id value to use for setting it implicitly in the aggregate state. I would suggest leaving this as it is now. I can add a check of the state Id property when the aggregate instance is available for storing the changes in the app service, and return an error when the Id is not set. It would expose the issue when the id is not explicitly set in the When.

@alexeyzimarev
Copy link
Contributor

Another option is to refactor the app service again. Right now, all the On overloads have two versions - one with the stream name function, and the other one with the id function. The alternative would be something like:

    protected void OnExisting<TCommand>(
        GetIdFromCommand<TId, TCommand>      getId,
        ActOnAggregate<TAggregate, TCommand> action,
        GetStreamFromId<TCommand>       getStream = null
    ) where TCommand : class

In this case, the app service can call SetId on the state object when it's instantiated. The getStream would not be using the command data, but the id that is created by the getId function. The drawback here is that you lose access to all the other command properties. However, it can be mitigated by including more properties to the identity record itself.

record PaymentId : AggregateId {
    public PaymentId(string id, string tenant) : base(id) => Tenant = tenant;

    public string Tenant { get; }
}

OnNew<RegisterPayment>(
    cmd => new PaymentId(cmd.PaymentId, cmd.TenantId),
    (payment, cmd)
        => payment.RegisterPayment(
            cmd.BookingId,
            new Money(cmd.AmountPaid, cmd.Currency)
        ),
    id => GetStreamName(id.Value, id.Tenant)
);

@alexeyzimarev
Copy link
Contributor

Ha, it makes me think that I did too much. If there would be a way to define the conversion from the id to the stream name, it will work for all the commands. So, there will be no need to specify the stream name function for each command handler.

@alexeyzimarev
Copy link
Contributor

public StoringEventsWithCustomStream() {
var streamNameMap = new StreamNameMap();
streamNameMap.Register<Booking, BookingState, BookingId>(GetStreamName);
Service = new BookingService(AggregateStore, streamNameMap);
TypeMap.RegisterKnownEventTypes();
}

@alexeyzimarev
Copy link
Contributor

In Cosmos, the cache is pretty much a no-brainer as there's a big difference between paying 1RU to validate your cache has the latest, vs paying 1 RU/K to re-read events or a snapshot (aside from the latency induced by redundant network traffic and deserialization time).

We must clearly separate systems with intensive load, where you run your services continuously, and can use the cache at the cost of the cache being out of sync, and managing, for example, LB session stickiness to increase the cache hit rate. And the other type of system is where you can run in a serverless workload and must give up the cache idea entirely as it just increases complexity without providing any value whatsoever due to low load.

It's also my reservation against pay per use databases in some scenarios as you start getting punished if you do simple things, so you have to make your code more complex just to reduce the database costs. It's really interesting that usage-based pricing is very attractive on low load, where optimisation is unnecessary. When you get to a higher load, you'd benefit from instance-based pricing since it's more predictable, and then you start optimising for performance.

But, getting back to the unnecessary state restore, I have a much simpler way to deal with it without any caching and nodes getting out of sync on the cache, and all that complexity. And, I want to do it outside the basic library scope, as I don't want to introduce these extra bits to people that don't really need it.

@alexeyzimarev
Copy link
Contributor

So, here's what I made, and some explanation:

I have to compute the aggregate id in application service handlers,

Yes, but only for OnExisting as we need to load events from the stream

Then again in the When method in aggregate state

Yes, but only for events that come first in the stream (after OnNew). Often, it's just one event

So, it is not really a repetitive thing. It's one or another, depending on what state you are in (new or existing).

Also, if I don't set the Id explicit on my state object, at least on the very first event, then my aggregate has no id.

After #79 it won't be able to calculate the stream name from the id if the id is not set. So, it will return ErrorResult. Therefore, you'll get it failing fast, nothing gets stored.

I hope it's good enough.

@bartelink
Copy link
Contributor

Re the caching aspect, sounds like we are on the same page. Absolutely agree a cache needs to earn its keep either in real latency reductions (which are not that easy to achieve against ESDB and/or may not be relevant with a SQL store), and that's not a given (with a lot of nodes, large states and/or a large active dataset a good hit rate is also harder to attain esp considering the cost of adding RAM in various hosting contexts).

Being able to opt in or out of caching as a config thing that does not impact app logic is definitely what you want. There's also a big benefit from not having to traverse optimisation discussions when learning. But for certain workloads, it's critical. They're definitely rare, but the design validation/nudges from seeing where it fits in your design is definitely valuable IME.=

alexeyzimarev added a commit that referenced this issue Apr 8, 2022
* Refactor stream name mapping

* Ensure that the new aggregate has an id. Proposed fix for #75
@alexeyzimarev
Copy link
Contributor

Ok, I want to close this one. I feel that I explained enough here #75 (comment)

Now if you try to persist an aggregate without an id, it will throw. It's a good enough guard rail.

Any improvements in terms of code are of course welcome, but I have to focus on other things.

alexeyzimarev added a commit that referenced this issue Apr 13, 2022
* Refactor stream name mapping
* Ensure that the new aggregate has an id. Proposed fix for #75
* Basic elastic store
* Got the store with archive to work
* Added id to StreamEvent
* Removed version parser as it's too complex
@alexeyzimarev
Copy link
Contributor

Seems like I am solving it here #111 https://github.com/Eventuous/eventuous/discussions/114

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants