diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/aggregates/AggregateStore.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/aggregates/AggregateStore.java index 4b7e6c39..5dcbe285 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/aggregates/AggregateStore.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/aggregates/AggregateStore.java @@ -28,7 +28,7 @@ public AggregateStore( this.getEmpty = getEmpty; } - Optional get(Id id) { + public Optional get(Id id) { var streamId = mapToStreamId.apply(id); var events = getEvents(streamId); diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/events/EventBus.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/events/EventBus.java index b8b3034f..d23cb3de 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/events/EventBus.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/core/events/EventBus.java @@ -5,7 +5,7 @@ import java.util.function.Consumer; public interface EventBus { - EventStore.AppendResult publish(Event command); + EventStore.AppendResult publish(Event event); void subscribe(Consumer>... handlers); } diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/Order.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/Order.java new file mode 100644 index 00000000..a7bd1405 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/Order.java @@ -0,0 +1,98 @@ +package io.eventdriven.distributedprocesses.ecommerce.orders; + +import io.eventdriven.distributedprocesses.core.aggregates.AbstractAggregate; +import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderCancelled; +import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderCompleted; +import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderInitialized; +import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderPaymentRecorded; +import io.eventdriven.distributedprocesses.ecommerce.orders.products.PricedProductItem; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public class Order extends AbstractAggregate { + public enum Status { + Opened, + Paid, + Completed, + Cancelled + } + + private UUID clientId; + + private PricedProductItem[] productItems; + + private double totalPrice; + + private Status status; + + private UUID paymentId; + + public static Order initialize( + UUID orderId, + UUID clientId, + PricedProductItem[] productItems, + double totalPrice, + OffsetDateTime now + ) { + return new Order( + orderId, + clientId, + productItems, + totalPrice, + now + ); + } + + private Order(UUID id, UUID clientId, PricedProductItem[] productItems, double totalPrice, OffsetDateTime now) { + enqueue(new OrderInitialized( + id, + clientId, + productItems, + totalPrice, + now + )); + } + + public void recordPayment(UUID paymentId, OffsetDateTime recordedAt) { + enqueue(new OrderPaymentRecorded( + id, + paymentId, + productItems, + totalPrice, + recordedAt + )); + } + + public void complete(OffsetDateTime now) { + if (status != Status.Paid) + throw new IllegalStateException("Cannot complete a not paid order."); + + enqueue(new OrderCompleted(id, now)); + } + + public void cancel(OrderCancellationReason cancellationReason, OffsetDateTime now) { + if (status == Status.Opened || status == Status.Cancelled) + throw new IllegalStateException("Cannot cancel a closed order."); + + enqueue(new OrderCancelled(id, paymentId, cancellationReason, now)); + } + + @Override + public void when(OrderEvent event) { + switch (event) { + case OrderInitialized orderInitialized -> { + id = orderInitialized.orderId(); + clientId = orderInitialized.clientId(); + productItems = orderInitialized.productItems(); + status = Status.Opened; + } + case OrderPaymentRecorded paymentRecorded -> { + paymentId = paymentRecorded.paymentId(); + status = Status.Paid; + } + case OrderCompleted completed -> status = Status.Completed; + case OrderCancelled cancelled -> status = Status.Cancelled; + } + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderCommand.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderCommand.java index 453e2839..a6078551 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderCommand.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderCommand.java @@ -2,6 +2,7 @@ import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem; +import java.time.OffsetDateTime; import java.util.UUID; public sealed interface OrderCommand { @@ -12,6 +13,12 @@ record InitializeOrder( double TotalPrice) implements OrderCommand { } + record RecordOrderPayment( + UUID OrderId, + UUID PaymentId, + OffsetDateTime PaymentRecordedAt) implements OrderCommand { + } + record CompleteOrder( UUID OrderId) implements OrderCommand { } diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderEvent.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderEvent.java index d9ba77a7..61d53683 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderEvent.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderEvent.java @@ -1,6 +1,7 @@ package io.eventdriven.distributedprocesses.ecommerce.orders; -import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem; +import io.eventdriven.distributedprocesses.ecommerce.orders.products.PricedProductItem; +import org.springframework.lang.Nullable; import java.time.OffsetDateTime; import java.util.UUID; @@ -10,11 +11,20 @@ record OrderInitialized( UUID orderId, UUID clientId, PricedProductItem[] productItems, - Double totalPrice, + double totalPrice, OffsetDateTime initializedAt ) implements OrderEvent { } + record OrderPaymentRecorded( + UUID orderId, + UUID paymentId, + PricedProductItem[] productItems, + double amount, + OffsetDateTime paymentRecordedAt + ) implements OrderEvent { + } + record OrderCompleted( UUID orderId, OffsetDateTime completedAt @@ -23,6 +33,7 @@ record OrderCompleted( record OrderCancelled( UUID OrderId, + @Nullable UUID paymentId, OrderCancellationReason Reason, OffsetDateTime cancelledAt diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderSaga.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderSaga.java new file mode 100644 index 00000000..16e2330e --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderSaga.java @@ -0,0 +1,96 @@ +package io.eventdriven.distributedprocesses.ecommerce.orders; + +import io.eventdriven.distributedprocesses.core.commands.CommandBus; +import io.eventdriven.distributedprocesses.ecommerce.payments.DiscardReason; +import io.eventdriven.distributedprocesses.ecommerce.payments.PaymentCommand; +import io.eventdriven.distributedprocesses.ecommerce.payments.external.PaymentExternalEvent; +import io.eventdriven.distributedprocesses.ecommerce.shipments.ProductItem; +import io.eventdriven.distributedprocesses.ecommerce.shipments.ShipmentCommand; +import io.eventdriven.distributedprocesses.ecommerce.shipments.ShipmentEvent; +import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.external.ShoppingCartFinalized; + +import java.util.Arrays; +import java.util.UUID; + +import static io.eventdriven.distributedprocesses.ecommerce.orders.OrderCommand.*; +import static io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.*; + +public class OrderSaga { + private final CommandBus commandBus; + + public OrderSaga(CommandBus commandBus) { + this.commandBus = commandBus; + } + + // Happy path + public void on(ShoppingCartFinalized event) { + commandBus.send( + new OrderCommand.InitializeOrder( + event.cartId(), + event.clientId(), + event.productItems(), + event.totalPrice() + ) + ); + } + + public void on(OrderInitialized event) { + commandBus.send( + new PaymentCommand.RequestPayment( + UUID.randomUUID(), + event.orderId(), event.totalPrice() + ) + ); + } + + public void on(PaymentExternalEvent.PaymentFinalized event) { + commandBus.send( + new RecordOrderPayment( + event.orderId(), + event.paymentId(), + event.finalizedAt() + ) + ); + } + + public void on(OrderPaymentRecorded event) { + commandBus.send( + new ShipmentCommand.SendPackage( + event.orderId(), + Arrays.stream(event.productItems()) + .map(pi -> new ProductItem(pi.productId(), pi.quantity())) + .toArray(ProductItem[]::new) + ) + ); + } + + public void on(ShipmentEvent.PackageWasSent event) { + commandBus.send( + new CompleteOrder( + event.orderId() + ) + ); + } + + // Compensation + public void on(ShipmentEvent.ProductWasOutOfStock event) { + commandBus.send( + new CancelOrder( + event.orderId(), + OrderCancellationReason.ProductWasOutOfStock + ) + ); + } + + public void on(OrderCancelled event) { + if (event.paymentId() == null) { + return; + } + commandBus.send( + new PaymentCommand.DiscardPayment( + event.paymentId(), + DiscardReason.OrderCancelled + ) + ); + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderService.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderService.java new file mode 100644 index 00000000..d98cedc7 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/OrderService.java @@ -0,0 +1,4 @@ +package io.eventdriven.distributedprocesses.ecommerce.orders; + +public class OrderService { +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/products/PricedProductItem.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/products/PricedProductItem.java new file mode 100644 index 00000000..490e6f5d --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/orders/products/PricedProductItem.java @@ -0,0 +1,9 @@ +package io.eventdriven.distributedprocesses.ecommerce.orders.products; + +import java.util.UUID; + +public record PricedProductItem( + UUID productId, + int quantity, + double unitPrice) { +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/Payment.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/Payment.java new file mode 100644 index 00000000..27733bba --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/Payment.java @@ -0,0 +1,74 @@ +package io.eventdriven.distributedprocesses.ecommerce.payments; + +import io.eventdriven.distributedprocesses.core.aggregates.AbstractAggregate; + +import java.time.OffsetDateTime; +import java.util.UUID; + +import static io.eventdriven.distributedprocesses.ecommerce.payments.PaymentEvent.*; + +public class Payment extends AbstractAggregate { + public UUID orderId() { + return orderId; + } + + public double amount() { + return amount; + } + + private enum Status { + Pending, + Completed, + Failed + } + + private UUID orderId; + private double amount; + private Status status; + + public static Payment request(UUID paymentId, UUID orderId, double amount) { + return new Payment(paymentId, orderId, amount); + } + + private Payment(UUID id, UUID orderId, double amount) { + enqueue(new PaymentRequested(id, orderId, amount)); + } + + public void complete(OffsetDateTime now) { + if (status != Status.Pending) + throw new IllegalStateException("Completing payment in '%s' status is not allowed.".formatted(status)); + + enqueue(new PaymentCompleted(id(), now)); + } + + public void discard(DiscardReason discardReason, OffsetDateTime now) { + if (status != Status.Pending) + throw new IllegalStateException("Discarding payment in '{%s}' status is not allowed.".formatted(status)); + + enqueue(new PaymentDiscarded(id(), discardReason, now)); + } + + public void timeOut(OffsetDateTime now) { + if (status != Status.Pending) + throw new IllegalStateException("Discarding payment in '{%s}' status is not allowed.".formatted(status)); + + var event = new PaymentTimedOut(id(), now); + + enqueue(event); + } + + @Override + public void when(PaymentEvent event) { + switch (event) { + case PaymentRequested paymentRequested -> { + id = paymentRequested.paymentId(); + orderId = paymentRequested.orderId(); + amount = paymentRequested.amount(); + status = Status.Pending; + } + case PaymentCompleted completed -> status = Status.Completed; + case PaymentDiscarded discarded -> status = Status.Failed; + case PaymentTimedOut paymentTimedOut -> status = Status.Failed; + } + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentEvent.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentEvent.java new file mode 100644 index 00000000..6c0d1387 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentEvent.java @@ -0,0 +1,31 @@ +package io.eventdriven.distributedprocesses.ecommerce.payments; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public sealed interface PaymentEvent { + record PaymentRequested( + UUID paymentId, + UUID orderId, + double amount + ) implements PaymentEvent { + } + + record PaymentCompleted( + UUID paymentId, + OffsetDateTime completedAt + ) implements PaymentEvent { + } + + record PaymentDiscarded( + UUID paymentId, + DiscardReason discardReason, + OffsetDateTime discardedAt) implements PaymentEvent { + } + + record PaymentTimedOut( + UUID paymentId, + OffsetDateTime timedOutAt + ) implements PaymentEvent { + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentService.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentService.java new file mode 100644 index 00000000..4a04d7a1 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentService.java @@ -0,0 +1,4 @@ +package io.eventdriven.distributedprocesses.ecommerce.payments; + +public class PaymentService { +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentsEvents.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentsEvents.java deleted file mode 100644 index 53bdd23d..00000000 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/PaymentsEvents.java +++ /dev/null @@ -1,31 +0,0 @@ -package io.eventdriven.distributedprocesses.ecommerce.payments; - -import java.time.OffsetDateTime; -import java.util.UUID; - -public sealed interface PaymentsEvents { - record PaymentRequested( - UUID PaymentId, - UUID OrderId, - double Amount - ) implements PaymentsEvents { - } - - record PaymentCompleted( - UUID PaymentId, - OffsetDateTime CompletedAt - ) implements PaymentsEvents { - } - - record PaymentDiscarded( - UUID PaymentId, - DiscardReason DiscardReason, - OffsetDateTime DiscardedAt) implements PaymentsEvents { - } - - record PaymentTimedOut( - UUID PaymentId, - OffsetDateTime TimedOutAt - ) implements PaymentsEvents { - } -} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEvent.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEvent.java new file mode 100644 index 00000000..4c4e10d3 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEvent.java @@ -0,0 +1,25 @@ +package io.eventdriven.distributedprocesses.ecommerce.payments.external; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public sealed interface PaymentExternalEvent { + record PaymentFinalized( + UUID orderId, + UUID paymentId, + double amount, + OffsetDateTime finalizedAt ) implements PaymentExternalEvent { + } + + record PaymentFailed( + UUID orderId, + UUID paymentId, + double amount, + OffsetDateTime failedAt, + Reason reason ) implements PaymentExternalEvent { + enum Reason{ + Discarded, + TimedOut + } + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEventForwarder.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEventForwarder.java new file mode 100644 index 00000000..d86b3d9b --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/payments/external/PaymentExternalEventForwarder.java @@ -0,0 +1,68 @@ +package io.eventdriven.distributedprocesses.ecommerce.payments.external; + +import io.eventdriven.distributedprocesses.core.aggregates.AggregateStore; +import io.eventdriven.distributedprocesses.core.events.EventBus; +import io.eventdriven.distributedprocesses.ecommerce.payments.Payment; +import io.eventdriven.distributedprocesses.ecommerce.payments.PaymentEvent; + +import java.util.UUID; + +import static io.eventdriven.distributedprocesses.ecommerce.payments.PaymentEvent.*; +import static io.eventdriven.distributedprocesses.ecommerce.payments.external.PaymentExternalEvent.*; + +public class PaymentExternalEventForwarder { + private final AggregateStore store; + private final EventBus eventBus; + + public PaymentExternalEventForwarder( + AggregateStore store, + EventBus eventBus + ) { + this.store = store; + this.eventBus = eventBus; + } + + public void on(PaymentCompleted event) { + var payment = store.get(event.paymentId()) + .orElseThrow(() -> new IllegalStateException("Cannot enrich event, as payment with id '%s' was not found".formatted(event.paymentId()))); + + var externalEvent = new PaymentFinalized( + payment.orderId(), + event.paymentId(), + payment.amount(), + event.completedAt() + ); + + eventBus.publish(externalEvent); + } + + public void on(PaymentDiscarded event) { + var payment = store.get(event.paymentId()) + .orElseThrow(() -> new IllegalStateException("Cannot enrich event, as payment with id '%s' was not found".formatted(event.paymentId()))); + + var externalEvent = new PaymentFailed( + payment.orderId(), + event.paymentId(), + payment.amount(), + event.discardedAt(), + PaymentFailed.Reason.Discarded + ); + + eventBus.publish(externalEvent); + } + + public void on(PaymentTimedOut event) { + var payment = store.get(event.paymentId()) + .orElseThrow(() -> new IllegalStateException("Cannot enrich event, as payment with id '%s' was not found".formatted(event.paymentId()))); + + var externalEvent = new PaymentFailed( + payment.orderId(), + event.paymentId(), + payment.amount(), + event.timedOutAt(), + PaymentFailed.Reason.TimedOut + ); + + eventBus.publish(externalEvent); + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/PaymentService.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/PaymentService.java new file mode 100644 index 00000000..41d36e86 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/PaymentService.java @@ -0,0 +1,4 @@ +package io.eventdriven.distributedprocesses.ecommerce.shipments; + +public class PaymentService { +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/ProductItem.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/ProductItem.java index ab9fcfff..c00bb7e8 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/ProductItem.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/ProductItem.java @@ -3,7 +3,6 @@ import java.util.UUID; public record ProductItem( - UUID id, UUID productId, int quantity ) { diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/Shipment.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/Shipment.java new file mode 100644 index 00000000..11f55cd8 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shipments/Shipment.java @@ -0,0 +1,4 @@ +package io.eventdriven.distributedprocesses.ecommerce.shipments; + +public class Shipment { +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCart.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCart.java index c1090e09..e8687458 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCart.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCart.java @@ -6,12 +6,25 @@ import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.ProductItem; import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.ProductItems; -import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.UUID; import static io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.ShoppingCartEvent.*; -class ShoppingCart extends AbstractAggregate { +public class ShoppingCart extends AbstractAggregate { + public UUID clientId() { + return clientId; + } + + public PricedProductItem[] productItems() { + return productItems.items().toArray(new PricedProductItem[0]); + } + + public double totalPrice() { + return productItems.items().stream() + .mapToDouble(PricedProductItem::totalPrice) + .sum(); + } private UUID clientId; private ProductItems productItems; private ShoppingCartStatus status; @@ -72,7 +85,7 @@ void confirm() { enqueue(new ShoppingCartConfirmed( id, - LocalDateTime.now() + OffsetDateTime.now() )); } @@ -82,7 +95,7 @@ void cancel() { enqueue(new ShoppingCartCanceled( id, - LocalDateTime.now() + OffsetDateTime.now() )); } diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCartEvent.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCartEvent.java index c7d0378e..16486eaa 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCartEvent.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/ShoppingCartEvent.java @@ -2,7 +2,7 @@ import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem; -import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.UUID; public sealed interface ShoppingCartEvent { @@ -27,13 +27,13 @@ record ProductItemRemovedFromShoppingCart( record ShoppingCartConfirmed( UUID shoppingCartId, - LocalDateTime confirmedAt + OffsetDateTime confirmedAt ) implements ShoppingCartEvent { } record ShoppingCartCanceled( UUID shoppingCartId, - LocalDateTime canceledAt + OffsetDateTime canceledAt ) implements ShoppingCartEvent { } } diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartExternalEventForwarder.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartExternalEventForwarder.java new file mode 100644 index 00000000..1f0c5f99 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartExternalEventForwarder.java @@ -0,0 +1,38 @@ +package io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.external; + +import io.eventdriven.distributedprocesses.core.aggregates.AggregateStore; +import io.eventdriven.distributedprocesses.core.events.EventBus; +import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.ShoppingCart; +import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.ShoppingCartEvent; + +import java.util.UUID; + +import static io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.ShoppingCartEvent.*; + +public class ShoppingCartExternalEventForwarder { + private final AggregateStore store; + private final EventBus eventBus; + + public ShoppingCartExternalEventForwarder( + AggregateStore store, + EventBus eventBus + ) { + this.store = store; + this.eventBus = eventBus; + } + + public void on(ShoppingCartConfirmed event) { + var cart = store.get(event.shoppingCartId()) + .orElseThrow(() -> new IllegalStateException("Cannot enrich event, as shopping cart with id '%s' was not found".formatted(event.shoppingCartId()))); + + var externalEvent = new ShoppingCartFinalized( + event.shoppingCartId(), + cart.clientId(), + cart.productItems(), + cart.totalPrice(), + event.confirmedAt() + ); + + eventBus.publish(externalEvent); + } +} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartFinalized.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartFinalized.java new file mode 100644 index 00000000..9e1cf296 --- /dev/null +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/ecommerce/shoppingcarts/external/ShoppingCartFinalized.java @@ -0,0 +1,14 @@ +package io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.external; + +import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public record ShoppingCartFinalized( + UUID cartId, + UUID clientId, + PricedProductItem[] productItems, + double totalPrice, + OffsetDateTime finalizedAt +) {} diff --git a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/hotelmanagement/groupcheckout/GroupCheckout.java b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/hotelmanagement/groupcheckout/GroupCheckout.java index 2eea51d2..63bed22e 100644 --- a/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/hotelmanagement/groupcheckout/GroupCheckout.java +++ b/samples/distributed-processes/src/main/java/io/eventdriven/distributedprocesses/hotelmanagement/groupcheckout/GroupCheckout.java @@ -46,14 +46,14 @@ private GroupCheckout( public void recordGuestStaysCheckoutInitiation(UUID[] initiatedCheckouts, OffsetDateTime initiatedAt) { if (status != CheckoutStatus.Initiated) - throw new RuntimeException("Cannot record guest stay if status is other than Initiated"); + throw new IllegalStateException("Cannot record guest stay if status is other than Initiated"); enqueue(new GuestCheckoutsInitiated(id(), initiatedCheckouts, initiatedAt)); } public void recordGuestStayCheckoutCompletion(UUID completedCheckout, OffsetDateTime completedAt) { if (status != CheckoutStatus.Initiated) - throw new RuntimeException("Cannot record guest stay if status is other than Initiated"); + throw new IllegalStateException("Cannot record guest stay if status is other than Initiated"); enqueue(new GuestCheckoutCompleted(id(), completedCheckout, completedAt)); @@ -62,7 +62,7 @@ public void recordGuestStayCheckoutCompletion(UUID completedCheckout, OffsetDate public void recordGuestStayCheckoutFailure(UUID failedCheckout, OffsetDateTime failedAt) { if (status != CheckoutStatus.Initiated) - throw new RuntimeException("Cannot record guest stay if status is other than Initiated"); + throw new IllegalStateException("Cannot record guest stay if status is other than Initiated"); enqueue(new GuestCheckoutCompleted(id(), failedCheckout, failedAt));