-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support async? #6
Comments
This is very much in our plans, and we have designed our traits so that it will not be too painful to make that transition. It will be a breaking change, but we are prepared for that. I generally am aiming to begin that work once/as std::future is stabilized (rust-lang/rust#59725), as I'd like to make the change only once, and use the new APIs internally (This may require using the compatibility layer to use with the This stabilization is targeted for 1.35, planned for May 23. We should be able to do some preliminary work against nightly, and then move to beta to validate. I'd love to have a new release with async-first support when that drops on stable. As for collaboration, I am very happy to have additional input/code contributions. Additional backends, or potential improvements are happily accepted. We are still trying to figure out how best to model and support "reactors", so that part is still a bit experimental, but the core of the event/command work has worked well for us so far. |
@neoeinstein thank you for the feedback. So, if you don't mind on collaboration, I'll dig deeper in your design and primitives in the next few days to see how they apply to my cases/needs. I'll share my thoughts and comparisons along with my design/code pieces to discuss the possible design improvements (if there will be a need). After all the design questions will be resolved and there will be a clear vision for me how to fit the framework for my projects, I can start to work on nightly async implementation to deliver it on stable with 1.35 landing in May. Is it OK for you? |
Works for me. |
That sounds great people, thanks for the effort! |
@neoeinstein I'm sorry for a huge delay. Right after my post above I was accidentally involved into some serious issues with legacy projects of my company, so last 5-6 weeks had no opportunity to even touch our Rust-based projects. Now I'm in the game again, so able to continue. First, I'll try to give a grasp of my solution design goals and what it's like. In our project this sub-crate is called Designing Now, let's share AggregateLet's start with the basis: cqrusty::Aggregate/// [DDD aggregate] that represents an isolated tree of entities, is
/// capable of handling [`Command`]s and is always kept in a consistent state.
///
/// [DDD aggregate]: https://martinfowler.com/bliki/DDD_Aggregate.html
pub trait Aggregate {
/// Type of [`Aggregate`]'s unique identifier (ID).
type Id: Clone; // can be even Copy, but for generic code Clone is more than enough
/// Bootstraps new [`Aggregate`] with clear initial state.
///
/// It's unlikely to store or to operate with an initial state
/// of an [`Aggregate`], as it may not satisfy uniqueness constraint.
/// In case [`Aggregate`] is [`VersionedAggregate`] and [`EventSourced`]
/// we assume that [`Aggregate`] exists if at least one [`Event`] exists
/// for it, so [`Aggregate`] is usable and distinguishable only after
/// at least one [`Event`] is applied to its initial state.
fn initial_state() -> Self;
/// Returns unique ID of this [`Aggregate`].
fn id(&self) -> &Self::Id;
} cqrusty::VersionedAggregate/// [`Aggregate`] which state is versioned.
///
/// Usually, [`VersionedAggregate`] implements [`EventSourced`], so that its
/// actual state can be calculated by applying its [`Event`]s (united into
/// an [`AggregateEvent`] type).
pub trait VersionedAggregate: Aggregate {
/// Type of [`Aggregate`]'s version.
///
/// `Version` reflects the version of [`Aggregate`]'s current state.
/// It's recommended for `Version` to be [`Ord`] for the ability
/// to distinguish between different state versions correctly and
/// to apply [`Event`]s in the correct order as they happened.
type Version: Clone;
/// Returns the current version of this [`Aggregate`].
fn version(&self) -> &Self::Version;
/// Sets the current version of this [`Aggregate`].
fn set_version(&mut self, version: &Self::Version);
} cqrusty::EventSourced/// State that can be calculated by applying specified [`Event`].
pub trait EventSourced<E: Event> {
/// Applies given [`Event`] to the current state.
fn apply_event(&mut self, event: &E);
} Such separation is introduced for ability to choose the desired guarantees (not every CQRS-based application is ES, so framework user should have an option to not use
EventAgain, the design of events is similar in many parts. However, I don't really like the current cqrusty::Event/// [Event Sourcing] event that describes something that has occurred in
/// the application (happened fact).
///
/// The sequence of [`Event`]s may represent a concrete versioned state
/// of an [`Aggregate`]. The state is calculated by implementing
/// [`EventSourced`] for the desired [`VersionedAggregate`] (or any other
/// stateful entity).
///
/// [Event Sourcing]: https://martinfowler.com/eaaDev/EventSourcing.html
pub trait Event {
/// Type of [`Event`]'s unique identifier (ID).
type Id: EventId;
/// Returns string representation of [`Event`]'s type.
fn event_type(&self) -> &'static str;
/// Returns [`Event`]'s version.
///
/// The single type of [`Event`] may have different versions, which allows
/// evolving [`Event`] in the type. To overcome the necessity of dealing
/// with multiple types of the same [`Event`], it's recommended for the last
/// actual version of [`Event`] to implement trait [`From`] its previous
/// versions, so they can be automatically transformed into the latest
/// actual version of [`Event`].
fn event_version(&self) -> &'static EventVersion;
} cqrusty::EventId/// Types allowed to be used as [`Event`] unique identifier.
pub trait EventId {
/// Generates new unique ID of [`Event`].
fn new_event_id() -> Self;
} cqrusty::AggregateEvent/// [Sum type][1] of all [`Event`]s belonging to some [`Aggregate`].
///
/// As [`Event`] types set in application is finite, this allows to avoid
/// dynamic dispatch costs when dealing with different types of [`Event`]
/// (if implemented as dispatchable `enum`).
///
/// For the convenience, this type should also implement [`From`] trait
/// for all underlying [`Event`]s.
///
/// [1]: https://en.wikipedia.org/wiki/Algebraic_data_type
pub trait AggregateEvent: Event {} cqrusty::EventMessage/// Message that wraps an [`Event`] and makes it sendable and storable.
#[derive(Clone, Debug, PartialEq)]
pub struct EventMessage<E: Event, M> {
/// Unique ID of this [`Event`].
pub id: E::Id,
/// Data of this [`Event`].
///
/// To deal with heterogeneous [`EventMessage`]s use an [`AggregateEvent`]
/// implementation here.
pub data: E,
/// Metadata of this [`Event`].
///
/// Note, that to make `meta` look like `"meta":{}` in JSON,
/// consider the [`serde_json` data formats](https://serde.rs/json.html)
/// and use empty struct _with braces_, as anything other (using empty
/// struct without braces or just `()`) will result in `"meta":null`.
pub meta: M,
}
Commandcqrusty::Command/// [CQRS] command that describes an intent to change the [`Aggregate`]'s state.
///
/// A state change within an application starts with a [`Command`].
/// A [`Command`] is a combination of expressed intent (which describes what
/// you want to do) as well as the information required to undertake action
/// based on that intent. The [`CommandHandler`] is used to process the incoming
/// [`Command`] for some [`Aggregate`], to validate it and to define the outcome
/// (an [`Event`], usually).
///
/// [CQRS]: https://martinfowler.com/bliki/CQRS.html
pub trait Command {
/// Type of [`Aggregate`] that this [`Command`] should be handled for.
type Aggregate: Aggregate;
/// Returns ID of the [`Aggregate`] that this [`Command`] is issued for.
/// If `None` is returned then this [`Command`] is handled by newly
/// created [`Aggregate`] instantiated with [`Aggregate::initial_state`].
fn aggregate_id(&self) -> Option<&<Self::Aggregate as Aggregate>::Id>;
} cqrusty::CommandHandler/// Handler of a specific [`Command`] that processes it for its [`Aggregate`].
pub trait CommandHandler<C: Command> {
/// Type of context required by this [`CommandHandler`] for performing
/// operation.
type Context: ?Sized;
/// Type of the value that this [`CommandHandler`] will return.
type Result;
/// Handles and processes given [`Command`] for its [`Aggregate`].
fn handle_command(&self, cmd: &C, ctx: Box<Self::Context>) -> Self::Result;
// TODO: Pass context as &mut Pin<Self::Context> when Futures will support
// passing references via Pin API.
}
Storecqrusty::AggregateRepository/// Generic repository of [`Aggregate`] as an abstraction of its storage.
///
/// While the specified trait definition is very limited for [CRUD] operations,
/// it can be easily extended for the purposes of specific domain.
///
/// [CRUD]: https://en.wikipedia.org/wiki/Create,_read,_update_and_delete
pub trait AggregateRepository<A: Aggregate> {
/// Type of errors returned by [`AggregateRepository`].
type Error;
/// Loads and returns the [`Aggregate`] with the given unique `id`.
///
/// No version checks are done when loading from [`AggregateRepository`]
/// and the latest stored version of [`Aggregate`] will be returned.
fn load(&self, id: &A::Id) -> DynFuture<Option<A>, Self::Error>;
/// Stores the [`Aggregate`] of given version.
///
/// It's no-op but succeeds, if [`AggregateRepository`] already contains
/// this [`Aggregate`] of higher-or-equal version.
fn store(&self, aggregate: &A) -> DynFuture<(), Self::Error>;
} cqrusty::EventStore/// Storage of all [`Event`]s belonging to specified [`Aggregate`].
pub trait EventStore<A: VersionedAggregate> {
/// Sum type of all [`Event`]s belonging to the [`Aggregate`].
type Event: AggregateEvent;
/// Type of [`EventMessage::meta`] that this [`EventStore`] operates with.
type EventMeta;
/// Type of errors returned by [`EventStore`].
type Error;
/// Stores given [`Event`] in [`EventStore`].
fn store_event(
&self,
event: &EventMessage<Self::Event, Self::EventMeta>,
) -> DynFuture<(), Self::Error>;
// TODO: store multiple events?
/// Reads all stored [`Event`]s of specified [`Aggregate`].
///
/// The returned `DynStream` is finite and ends with the last stored
/// [`Event`] of the [`Aggregate`].
fn read_events(
&self,
aggregate_id: &A::Id,
from_version: Option<&A::Version>,
) -> DynStream<EventMessage<Self::Event, Self::EventMeta>, Self::Error>;
} Here is the part where I'm quite impressed with your simple and elegant design choices. All the Additional aspect worth mentioning: cqrusty::UnitOfWork/// [Unit of Work] which encompasses multiple operations in a single unit.
/// The purpose of [`UnitOfWork`] is to coordinate actions performed during
/// the processing of a message ([`Command`], [`Event`] or query).
///
/// It's unlikely to need direct access to [`UnitOfWork`]. It is mainly used
/// by CQRS framework building blocks. However, it's vital and required for
/// custom implementations of [`CommandGateway`].
///
/// # Ideology note
///
/// > Note, that in its idea the [`UnitOfWork`] is merely a buffer of changes,
/// > not a replacement for database transactions. Although all staged changes
/// > are only committed when the [`UnitOfWork`] is committed, its commit is not
/// > atomic. That means that when a commit fails, some changes might have been
/// > persisted, while others are not. Best practices dictate that a command
/// > should never contain more than one action. If you stick to that practice,
/// > a [`UnitOfWork`] will contain a single action, making it safe to use
/// > as-is. If you have more actions in your [`UnitOfWork`], then you could
/// > consider attaching a database transaction to the [`UnitOfWork`]'s commit.
///
/// While the above is ideologically correct, the current version of CQRS
/// framework does not support that. At the moment the [`UnitOfWork`] is always
/// backed by database transaction, which is required for ensuring strong
/// consistency. This is likely to be changed in future.
///
/// [Unit of Work]: https://martinfowler.com/eaaCatalog/unitOfWork.html
pub trait UnitOfWork {
/// Context that is passed to inner closure wrapped into the [`UnitOfWork`].
type Context;
/// Error of [`UnitOfWork`] creation or committing.
type Error;
/// Wraps provided closure into a [Unit of Work] which commits one
/// returned `Future` resolves successfully or rolls-back otherwise.
fn unit_of_work<W, R>(
&mut self,
work: W,
) -> DynFuture<R::Item, UnitOfWorkError<Self::Error, R::Error>>
where
W: FnOnce(Self::Context) -> R + 'static,
// TODO: Use &mut Self::Context here when Futures will support
// reference passing, or GATs will be landed.
// Currently, due to futures 0.1 we're blocked with FnOnce,
// which disallows referencing out of the closure:
// https://stackoverflow.com/questions/31403723
// /how-to-declare-a-lifetime-for-a-closure-argument
R: IntoFuture + 'static; // TODO: remove 'static when futures allow
} FlowHere is the actual and interesting part: how all the things above interact with each other. What I've understood from
So, the actual "framework" logic happens in In Docs cite//! 1. To change state of application a `Command` should be issued, which would
//! be routed to its `Aggregate` automatically by framework.
//! - `Command` is handled by its `Aggregate` which mutates its state
//! respectively, or just produces `Event`s which are later automatically
//! applied to the `Aggregate`'s state.
//! - `Event` may trigger another `Command`s issuing, which will be
//! processed in the same way.
//! - Each `Command` handling is processed inside single `UnitOfWork` to
//! ensure atomicity of applied changes. Handling multiple `Command`s in a
//! consistent way (if required) may be done via `Saga` usage.
//! - Issued `Command` may be validated and intercepted/processed by other
//! `Aggregate`s via `CommandValidators` and `CommandInterceptor`s.
//! 2. To retrieve application's data a `Query` should be used. `Query`ing data
//! is always a stateless act and cannot mutate application state in any way.
//! 3. Every part of framework is highly abstracted and can be re-implemented
//! in the desired manner (for example, `CommandBus` may be in-memory static
//! structure or remote [Apache Kafka] server).
cqrusty::CommandGateway/// Dispatcher of a specific [`Command`] to the [`CommandHandler`] of a concrete
/// instance of its [`Aggregate`].
///
/// This trait is intended to be an entry point for a [`Command`] in a domain
/// layer of application. Other application layers should send a [`Command`] via
/// [`CommandGateway`].
pub trait CommandGateway<C: Command> {
/// Type of this [`Command`]'s processing result.
type Result;
/// Dispatches the given [`Command`] to the [`CommandHandler`] of a concrete
/// instance of its [`Aggregate`] and returns the processing result.
fn command(&self, cmd: C) -> Self::Result;
}
Future plansFuture plans for
Phew... this took quite a time 🙃 @neoeinstein sorry for the bad English (not my native lingua). I'm looking for some constructive feedback from you. What are you thinking about all above? In the next post I'll be more concrete and describe the concrete necessities our projects require for switching to |
@neoeinstein ping |
1 similar comment
@neoeinstein ping |
First off, sorry for such a long time in getting back to this. On to the merits. Note, your English is perfectly fine, nothing to be self-conscious of there. Aggregate
I'm open to separating these two. Having an
That was also my thought on seeing your
I would like to change
Command
I like this, but see below.
Our general philosophy with Commands is that to be a true command, it needs to be self contained without side effects. If there is a need to access a database or external resource (or even collect the current time), that should be resolved at the layer above, and then the result of those operations pulled into the Store
As designed, this set of libraries has the concept of optimistic concurrency in mind. Thinking about how to frame a Flow
Yeah, we were trying something experimental here, but that will probably get ripped out and replaced. The main thing here is providing a way for some process to follow an event stream, reacting to the events, keeping quick-to-read snapshots up to date offline, or building new projections. This area needs some iteration. SummaryAgain, I apologize that I haven't been present to handle this as I'd have liked. I'll start taking some of the steps mentioned above as PRs, and I'll include you if you'd like to be kept up to date. |
@neoeinstein thanks for the feedback! It's OK about delay. I've started to experiment with all this in
Actually, with recent use we've found it OK not being generic. The only change I've made is extending its to
Hmmm... from what I've seen out there in examples and CQRS frameworks the For example, we have to situations:
Yup, generally CQRS/ES is used in a key of optimistic concurrency and eventual consistency. Our case is a bit simplified as we're using CQRS/ES with strong consistency in the manner described here, but with the aim to "jump onto" eventual consistency if/when performance bottleneck will appear there. So, my vision of CQRS/ES framework is to provide a set of well designed core primitives and some ready "lego" parts of framework implementation. So, either library user can take some high-level abstraction and just go with them, or reuse core primitives and some parts to build its own flow model. Without restricting user to use only strong consistency or eventual consistency, using event sourcing at all, or dispatching commands inside single process only. |
Hello there! Thank you for your efforts and sharing this ❤️
I'm doing the very similar work at the moment, but planning to mature it in a closed source code base before releasing/sharing anything. Your work is quite interesting to me as decomposes stuff in a different way. The main difference is that my solution supports both sync/async by abstracting over result types. However, in your solution there is a strict
Result
everywhere, and example apps use sync IO (r2d2, iron, postgres).Actually, I'd like to elaborate with you to focus all the power on a singe The Ecosystem Framework, rather than spreading it on half-solutions. However, my projects are heavily async based, so I cannot "just switch" to your framework at the moment.
Do you have any plans to support async? Are there any designs figured out for that? Can I help somehow with this?
The text was updated successfully, but these errors were encountered: