-
Notifications
You must be signed in to change notification settings - Fork 44
Projecting Events
The events raised by entities are received on clients to update databases. They are delivered via the NServiceBus pipeline so your event handlers will look the exact same as if you were publishing your events to a transport like RabbitMq. This also frees your consumers to write to whichever back end database you want - as Aggregates.NET doesn't care about your storage only delivery.
Here in Handler.cs we are projecting buyer-related events into an elastic index.
public class Handler :
IHandleQueries<Queries.Buyers>,
IHandleMessages<Events.Initiated>,
IHandleMessages<Events.InGoodStanding>,
IHandleMessages<Events.PreferredAddressSet>,
IHandleMessages<Events.PreferredPaymentSet>,
IHandleMessages<Events.Suspended>,
IHandleMessages<Order.Events.Drafted>,
IHandleMessages<Order.Events.Paid>,
IHandleMessages<Order.Events.Canceled>
{
public async Task Handle(Queries.Buyers query, IMessageHandlerContext ctx)
{
...
}
public Task Handle(Events.Initiated e, IMessageHandlerContext ctx)
{
var model = new Models.OrderingBuyerIndex
{
Id = e.UserName,
GivenName = e.GivenName,
GoodStanding = true
};
return ctx.App<Infrastructure.IUnitOfWork>().Add(e.UserName, model);
}
public async Task Handle(Events.InGoodStanding e, IMessageHandlerContext ctx)
{
var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
buyer.GoodStanding = true;
await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
}
public async Task Handle(Events.PreferredAddressSet e, IMessageHandlerContext ctx)
{
var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
var address = await ctx.App<Infrastructure.IUnitOfWork>().Get<Entities.Address.Models.Address>(e.AddressId)
.ConfigureAwait(false);
buyer.PreferredCity = address.City;
buyer.PreferredState = address.State;
buyer.PreferredZipCode = address.ZipCode;
buyer.PreferredCountry = address.Country;
await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
}
public async Task Handle(Events.PreferredPaymentSet e, IMessageHandlerContext ctx)
{
var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
var method = await ctx.App<Infrastructure.IUnitOfWork>()
.Get<Entities.PaymentMethod.Models.PaymentMethod>(e.PaymentMethodId).ConfigureAwait(false);
buyer.PreferredPaymentCardholder = method.CardholderName;
buyer.PreferredPaymentMethod = method.CardType;
buyer.PreferredPaymentExpiration = method.Expiration.ToString("MM/yy");
await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
}
}
Notice how we define our subscription to an event by simply IHandleMessages<Events.Initiated>
. Aggregates.NET will scan for these and automatically create a projection in EventStore to receive these events as part of a subscriber group. We'll receive the events here and as you can see we utilize a generic unit of work to create and update an object based on the events.
We can even load other entities to project their data into our model like so:
public async Task Handle(Events.PreferredAddressSet e, IMessageHandlerContext ctx)
{
var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
var address = await ctx.App<Infrastructure.IUnitOfWork>().Get<Entities.Address.Models.Address>(e.AddressId)
.ConfigureAwait(false);
buyer.PreferredCity = address.City;
buyer.PreferredState = address.State;
buyer.PreferredZipCode = address.ZipCode;
buyer.PreferredCountry = address.Country;
await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
}
The ability to load Address
above is dependent on the fact that the model exists in elastic
already of course.