Skip to content

Commit

Permalink
Implemented the Order Saga
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Jul 8, 2022
1 parent 5741b72 commit 9c3c8fc
Show file tree
Hide file tree
Showing 22 changed files with 514 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public AggregateStore(
this.getEmpty = getEmpty;
}

Optional<Entity> get(Id id) {
public Optional<Entity> get(Id id) {
var streamId = mapToStreamId.apply(id);

var events = getEvents(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.function.Consumer;

public interface EventBus {
<Event> EventStore.AppendResult publish(Event command);
<Event> EventStore.AppendResult publish(Event event);

void subscribe(Consumer<EventEnvelope<Object>>... handlers);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.eventdriven.distributedprocesses.ecommerce.orders;

import io.eventdriven.distributedprocesses.core.aggregates.AbstractAggregate;
import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderCancelled;
import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderCompleted;
import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderInitialized;
import io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.OrderPaymentRecorded;
import io.eventdriven.distributedprocesses.ecommerce.orders.products.PricedProductItem;

import java.time.OffsetDateTime;
import java.util.UUID;

public class Order extends AbstractAggregate<OrderEvent, UUID> {
public enum Status {
Opened,
Paid,
Completed,
Cancelled
}

private UUID clientId;

private PricedProductItem[] productItems;

private double totalPrice;

private Status status;

private UUID paymentId;

public static Order initialize(
UUID orderId,
UUID clientId,
PricedProductItem[] productItems,
double totalPrice,
OffsetDateTime now
) {
return new Order(
orderId,
clientId,
productItems,
totalPrice,
now
);
}

private Order(UUID id, UUID clientId, PricedProductItem[] productItems, double totalPrice, OffsetDateTime now) {
enqueue(new OrderInitialized(
id,
clientId,
productItems,
totalPrice,
now
));
}

public void recordPayment(UUID paymentId, OffsetDateTime recordedAt) {
enqueue(new OrderPaymentRecorded(
id,
paymentId,
productItems,
totalPrice,
recordedAt
));
}

public void complete(OffsetDateTime now) {
if (status != Status.Paid)
throw new IllegalStateException("Cannot complete a not paid order.");

enqueue(new OrderCompleted(id, now));
}

public void cancel(OrderCancellationReason cancellationReason, OffsetDateTime now) {
if (status == Status.Opened || status == Status.Cancelled)
throw new IllegalStateException("Cannot cancel a closed order.");

enqueue(new OrderCancelled(id, paymentId, cancellationReason, now));
}

@Override
public void when(OrderEvent event) {
switch (event) {
case OrderInitialized orderInitialized -> {
id = orderInitialized.orderId();
clientId = orderInitialized.clientId();
productItems = orderInitialized.productItems();
status = Status.Opened;
}
case OrderPaymentRecorded paymentRecorded -> {
paymentId = paymentRecorded.paymentId();
status = Status.Paid;
}
case OrderCompleted completed -> status = Status.Completed;
case OrderCancelled cancelled -> status = Status.Cancelled;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem;

import java.time.OffsetDateTime;
import java.util.UUID;

public sealed interface OrderCommand {
Expand All @@ -12,6 +13,12 @@ record InitializeOrder(
double TotalPrice) implements OrderCommand {
}

record RecordOrderPayment(
UUID OrderId,
UUID PaymentId,
OffsetDateTime PaymentRecordedAt) implements OrderCommand {
}

record CompleteOrder(
UUID OrderId) implements OrderCommand {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.eventdriven.distributedprocesses.ecommerce.orders;

import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.productitems.PricedProductItem;
import io.eventdriven.distributedprocesses.ecommerce.orders.products.PricedProductItem;
import org.springframework.lang.Nullable;

import java.time.OffsetDateTime;
import java.util.UUID;
Expand All @@ -10,11 +11,20 @@ record OrderInitialized(
UUID orderId,
UUID clientId,
PricedProductItem[] productItems,
Double totalPrice,
double totalPrice,
OffsetDateTime initializedAt
) implements OrderEvent {
}

record OrderPaymentRecorded(
UUID orderId,
UUID paymentId,
PricedProductItem[] productItems,
double amount,
OffsetDateTime paymentRecordedAt
) implements OrderEvent {
}

record OrderCompleted(
UUID orderId,
OffsetDateTime completedAt
Expand All @@ -23,6 +33,7 @@ record OrderCompleted(

record OrderCancelled(
UUID OrderId,
@Nullable
UUID paymentId,
OrderCancellationReason Reason,
OffsetDateTime cancelledAt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.eventdriven.distributedprocesses.ecommerce.orders;

import io.eventdriven.distributedprocesses.core.commands.CommandBus;
import io.eventdriven.distributedprocesses.ecommerce.payments.DiscardReason;
import io.eventdriven.distributedprocesses.ecommerce.payments.PaymentCommand;
import io.eventdriven.distributedprocesses.ecommerce.payments.external.PaymentExternalEvent;
import io.eventdriven.distributedprocesses.ecommerce.shipments.ProductItem;
import io.eventdriven.distributedprocesses.ecommerce.shipments.ShipmentCommand;
import io.eventdriven.distributedprocesses.ecommerce.shipments.ShipmentEvent;
import io.eventdriven.distributedprocesses.ecommerce.shoppingcarts.external.ShoppingCartFinalized;

import java.util.Arrays;
import java.util.UUID;

import static io.eventdriven.distributedprocesses.ecommerce.orders.OrderCommand.*;
import static io.eventdriven.distributedprocesses.ecommerce.orders.OrderEvent.*;

public class OrderSaga {
private final CommandBus commandBus;

public OrderSaga(CommandBus commandBus) {
this.commandBus = commandBus;
}

// Happy path
public void on(ShoppingCartFinalized event) {
commandBus.send(
new OrderCommand.InitializeOrder(
event.cartId(),
event.clientId(),
event.productItems(),
event.totalPrice()
)
);
}

public void on(OrderInitialized event) {
commandBus.send(
new PaymentCommand.RequestPayment(
UUID.randomUUID(),
event.orderId(), event.totalPrice()
)
);
}

public void on(PaymentExternalEvent.PaymentFinalized event) {
commandBus.send(
new RecordOrderPayment(
event.orderId(),
event.paymentId(),
event.finalizedAt()
)
);
}

public void on(OrderPaymentRecorded event) {
commandBus.send(
new ShipmentCommand.SendPackage(
event.orderId(),
Arrays.stream(event.productItems())
.map(pi -> new ProductItem(pi.productId(), pi.quantity()))
.toArray(ProductItem[]::new)
)
);
}

public void on(ShipmentEvent.PackageWasSent event) {
commandBus.send(
new CompleteOrder(
event.orderId()
)
);
}

// Compensation
public void on(ShipmentEvent.ProductWasOutOfStock event) {
commandBus.send(
new CancelOrder(
event.orderId(),
OrderCancellationReason.ProductWasOutOfStock
)
);
}

public void on(OrderCancelled event) {
if (event.paymentId() == null) {
return;
}
commandBus.send(
new PaymentCommand.DiscardPayment(
event.paymentId(),
DiscardReason.OrderCancelled
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.eventdriven.distributedprocesses.ecommerce.orders;

public class OrderService {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.eventdriven.distributedprocesses.ecommerce.orders.products;

import java.util.UUID;

public record PricedProductItem(
UUID productId,
int quantity,
double unitPrice) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.eventdriven.distributedprocesses.ecommerce.payments;

import io.eventdriven.distributedprocesses.core.aggregates.AbstractAggregate;

import java.time.OffsetDateTime;
import java.util.UUID;

import static io.eventdriven.distributedprocesses.ecommerce.payments.PaymentEvent.*;

public class Payment extends AbstractAggregate<PaymentEvent, UUID> {
public UUID orderId() {
return orderId;
}

public double amount() {
return amount;
}

private enum Status {
Pending,
Completed,
Failed
}

private UUID orderId;
private double amount;
private Status status;

public static Payment request(UUID paymentId, UUID orderId, double amount) {
return new Payment(paymentId, orderId, amount);
}

private Payment(UUID id, UUID orderId, double amount) {
enqueue(new PaymentRequested(id, orderId, amount));
}

public void complete(OffsetDateTime now) {
if (status != Status.Pending)
throw new IllegalStateException("Completing payment in '%s' status is not allowed.".formatted(status));

enqueue(new PaymentCompleted(id(), now));
}

public void discard(DiscardReason discardReason, OffsetDateTime now) {
if (status != Status.Pending)
throw new IllegalStateException("Discarding payment in '{%s}' status is not allowed.".formatted(status));

enqueue(new PaymentDiscarded(id(), discardReason, now));
}

public void timeOut(OffsetDateTime now) {
if (status != Status.Pending)
throw new IllegalStateException("Discarding payment in '{%s}' status is not allowed.".formatted(status));

var event = new PaymentTimedOut(id(), now);

enqueue(event);
}

@Override
public void when(PaymentEvent event) {
switch (event) {
case PaymentRequested paymentRequested -> {
id = paymentRequested.paymentId();
orderId = paymentRequested.orderId();
amount = paymentRequested.amount();
status = Status.Pending;
}
case PaymentCompleted completed -> status = Status.Completed;
case PaymentDiscarded discarded -> status = Status.Failed;
case PaymentTimedOut paymentTimedOut -> status = Status.Failed;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.eventdriven.distributedprocesses.ecommerce.payments;

import java.time.OffsetDateTime;
import java.util.UUID;

public sealed interface PaymentEvent {
record PaymentRequested(
UUID paymentId,
UUID orderId,
double amount
) implements PaymentEvent {
}

record PaymentCompleted(
UUID paymentId,
OffsetDateTime completedAt
) implements PaymentEvent {
}

record PaymentDiscarded(
UUID paymentId,
DiscardReason discardReason,
OffsetDateTime discardedAt) implements PaymentEvent {
}

record PaymentTimedOut(
UUID paymentId,
OffsetDateTime timedOutAt
) implements PaymentEvent {
}
}
Loading

0 comments on commit 9c3c8fc

Please sign in to comment.