diff --git a/workflow-saga/components/state_redis.yaml b/workflow-saga/components/state_redis.yaml new file mode 100644 index 000000000..7c45ff885 --- /dev/null +++ b/workflow-saga/components/state_redis.yaml @@ -0,0 +1,15 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + initTimeout: 1m + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" \ No newline at end of file diff --git a/workflow-saga/java/sdk/README.md b/workflow-saga/java/sdk/README.md new file mode 100644 index 000000000..62eb0d375 --- /dev/null +++ b/workflow-saga/java/sdk/README.md @@ -0,0 +1,118 @@ +# Dapr workflows + +In this quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and saga pattern support. The console app starts and manages the lifecycle of a workflow that stores and retrieves data in a state store. + +This quickstart includes one project: + +- Java console app `order-processor-saga` + +The quickstart contains 1 workflow to simulate purchasing items from a store, and 6 unique activities within the workflow. These 6 activities are as follows: + +- NotifyActivity: This activity utilizes a logger to print out messages throughout the workflow. These messages notify the user when there is insufficient inventory, their payment couldn't be processed, and more. +- ReserveInventoryActivity: This activity checks the state store to ensure that there is enough inventory present for purchase. +- RequestApprovalActivity: This activity requests approval for orders over a certain threshold +- ProcessPaymentActivity: This activity is responsible for processing and authorizing the payment. +- UpdateInventoryActivity: This activity updates the state store with the new remaining inventory value. +- DistributionActivity: This activity starts the distribution. In this quickstart, it will allways be failed to trigger saga compensation. + +### Run the order processor workflow with multi-app-run + +1. Open a new terminal window and navigate to `order-processor` directory: + + + +```bash +cd ./order-processor-saga +mvn clean install +cd .. +``` + + +2. Run the console app with Dapr: + + + +```bash +dapr run -f . +``` + + + +3. Expected output + + +``` +== APP - SagaConsoleApp == *** Welcome to the Dapr saga console app sample! +== APP - SagaConsoleApp == *** Using this app, you can place orders that start workflows. +== APP - SagaConsoleApp == Start workflow runtime +== APP - SagaConsoleApp == Oct 24, 2023 7:00:56 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP - SagaConsoleApp == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:33907. +== APP - SagaConsoleApp == ==========Begin the purchase of item:========== +== APP - SagaConsoleApp == Starting order workflow, purchasing 10 of cars +== APP - SagaConsoleApp == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: 48a551c1-a8ac-4622-ab28-ae89647066f3 +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.saga.OrderProcessingWorkflow +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): 48a551c1-a8ac-4622-ab28-ae89647066f3 +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-10-24T07:01:00.264Z +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP - SagaConsoleApp == workflow instance 48a551c1-a8ac-4622-ab28-ae89647066f3 started +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - Reserving inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - There are 100 cars available for purchase +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ReserveInventoryActivity - Reserved inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Processing payment: 48a551c1-a8ac-4622-ab28-ae89647066f3 for 10 cars at $150000 +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Payment for request ID '48a551c1-a8ac-4622-ab28-ae89647066f3' processed successfully +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Updating inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Updated inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3': there are now 90 cars left in stock +== APP - SagaConsoleApp == there are now 90 cars left in stock +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Compensating inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3' of 10 cars +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.UpdateInventoryActivity - Compensated inventory for order '48a551c1-a8ac-4622-ab28-ae89647066f3': there are now 100 cars left in stock +== APP - SagaConsoleApp == there are now 100 cars left in stock +== APP - SagaConsoleApp == [Thread-0] INFO io.dapr.quickstarts.saga.activities.ProcessPaymentActivity - Compensating payment for request ID '48a551c1-a8ac-4622-ab28-ae89647066f3' at $150000 +== APP - SagaConsoleApp == workflow instance completed, out is: {"processed":false,"compensated":true} +``` + +### View workflow output with Zipkin + +For a more detailed view of the workflow activities (duration, progress etc.), try using Zipkin. + +1. Launch Zipkin container - The [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) docker container is launched on running `dapr init`. Check to make sure the container is running. If it's not, launch the Zipkin docker container with the following command. + +```bash +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +2. View Traces in Zipkin UI - In your browser go to http://localhost:9411 to view the workflow trace spans in the Zipkin web UI. The order-processor workflow should be viewable with the following output in the Zipkin web UI. + + + +### What happened? + +When you ran `dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp` + +1. A unique order ID for the workflow is generated (in the above example, `95d33f7c-3af8-4960-ba11-4ecea83b0509`) and the workflow is scheduled. +2. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +3. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +4. Your workflow starts and notifies you of its status. +5. The `RequestApprovalActivity` workflow activity requests approval for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` +6. The `ProcessPaymentActivity` workflow activity begins processing payment for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` and confirms if successful. +7. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +8. The `DistributionActivity` workflow activity failed (to trigger saga compensation). +9. Saga compensation is triggered and compensated for `ProcessPaymentActivity` and `UpdateInventoryActivity` +10. The workflow terminates as completed with processed=false and compensated=true. + diff --git a/workflow-saga/java/sdk/dapr.yaml b/workflow-saga/java/sdk/dapr.yaml new file mode 100644 index 000000000..acd495ff4 --- /dev/null +++ b/workflow-saga/java/sdk/dapr.yaml @@ -0,0 +1,7 @@ +version: 1 +common: + resourcesPath: ../../components +apps: + - appID: SagaConsoleApp + appDirPath: ./order-processor-saga/target + command: ["java", "-jar", "SagaService-0.0.1-SNAPSHOT.jar", "io.dapr.quickstarts.saga.SagaConsoleApp"] diff --git a/workflow-saga/java/sdk/img/workflow-trace-spans-zipkin.png b/workflow-saga/java/sdk/img/workflow-trace-spans-zipkin.png new file mode 100644 index 000000000..6564dd4c5 Binary files /dev/null and b/workflow-saga/java/sdk/img/workflow-trace-spans-zipkin.png differ diff --git a/workflow-saga/java/sdk/makefile b/workflow-saga/java/sdk/makefile new file mode 100644 index 000000000..e7a8826bf --- /dev/null +++ b/workflow-saga/java/sdk/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file diff --git a/workflow-saga/java/sdk/order-processor-saga/pom.xml b/workflow-saga/java/sdk/order-processor-saga/pom.xml new file mode 100644 index 000000000..ebf7d6619 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/pom.xml @@ -0,0 +1,60 @@ + + + 4.0.0 + + com.service + SagaService + 0.0.1-SNAPSHOT + SagaService + Demo for Dapr workflow Saga + + 11 + 11 + 1.6.1 + + + + io.dapr + dapr-sdk-workflows + 0.11.0-SNAPSHOT + + + org.slf4j + slf4j-simple + 1.7.36 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.6.3 + + + + repackage + + + + io.dapr.quickstarts.saga.SagaConsoleApp + + + + + + + + + + + maven-SNAPSHOT + https://oss.sonatype.org/content/repositories/snapshots/ + + true + + + + diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/OrderProcessingWorkflow.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/OrderProcessingWorkflow.java new file mode 100644 index 000000000..05fcbee97 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/OrderProcessingWorkflow.java @@ -0,0 +1,128 @@ +package io.dapr.quickstarts.saga; + +import org.slf4j.Logger; + +import io.dapr.quickstarts.saga.activities.DeliveryActivity; +import io.dapr.quickstarts.saga.activities.NotifyActivity; +import io.dapr.quickstarts.saga.activities.ProcessPaymentActivity; +import io.dapr.quickstarts.saga.activities.ProcessPaymentCompensationActivity; +import io.dapr.quickstarts.saga.activities.RequestApprovalActivity; +import io.dapr.quickstarts.saga.activities.ReserveInventoryActivity; +import io.dapr.quickstarts.saga.activities.UpdateInventoryActivity; +import io.dapr.quickstarts.saga.activities.UpdateInventoryCompensationActivity; +import io.dapr.quickstarts.saga.models.ApprovalResult; +import io.dapr.quickstarts.saga.models.InventoryRequest; +import io.dapr.quickstarts.saga.models.InventoryResult; +import io.dapr.quickstarts.saga.models.Notification; +import io.dapr.quickstarts.saga.models.OrderPayload; +import io.dapr.quickstarts.saga.models.OrderResult; +import io.dapr.quickstarts.saga.models.PaymentRequest; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.saga.SagaOption; + +public class OrderProcessingWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String orderId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID(order ID): " + orderId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Received Order: " + order.toString()); + OrderResult orderResult = new OrderResult(); + + // step1: notify the user that an order has come through + Notification notification = new Notification(); + notification.setMessage("Received Order: " + order.toString()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // step2: determine if there is enough of the item available for purchase by + // checking the inventory + InventoryRequest inventoryRequest = new InventoryRequest(); + inventoryRequest.setRequestId(orderId); + inventoryRequest.setItemName(order.getItemName()); + inventoryRequest.setQuantity(order.getQuantity()); + InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + + // If there is insufficient inventory, fail and let the user know + if (!inventoryResult.isSuccess()) { + notification.setMessage("Insufficient inventory for order : " + order.getItemName()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // step3: require orders over a certain threshold to be approved + if (order.getTotalCost() > 5000) { + ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), + order, ApprovalResult.class).await(); + if (approvalResult != ApprovalResult.Approved) { + notification.setMessage("Order " + order.getItemName() + " was not approved."); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + } + + // There is enough inventory available so the user can purchase the item(s). + // step4: Process their payment (need compensation) + PaymentRequest paymentRequest = new PaymentRequest(); + paymentRequest.setRequestId(orderId); + paymentRequest.setItemBeingPurchased(order.getItemName()); + paymentRequest.setQuantity(order.getQuantity()); + paymentRequest.setAmount(order.getTotalCost()); + boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), + paymentRequest, boolean.class).await(); + if (!isOK) { + notification.setMessage("Payment failed for order : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + ctx.getSagaContext().registerCompensation(ProcessPaymentCompensationActivity.class.getName(), paymentRequest); + + // step5: Update the inventory (need compensation) + inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + if (!inventoryResult.isSuccess()) { + // Let users know their payment processing failed + notification.setMessage("Order failed to update inventory! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // trigger saga compensation gracefully + ctx.getSagaContext().compensate(); + orderResult.setCompensated(true); + ctx.complete(orderResult); + return; + } + ctx.getSagaContext().registerCompensation(UpdateInventoryCompensationActivity.class.getName(), + inventoryRequest); + + // step6: delevery (allways be failed to trigger compensation) + ctx.callActivity(DeliveryActivity.class.getName()).await(); + + // step7: Let user know their order was processed(won't be executed if step6 + // failed) + notification.setMessage("Order completed! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Complete the workflow with order result is processed(won't be executed if + // step6 failed) + orderResult.setProcessed(true); + ctx.complete(orderResult); + }; + } + + @Override + public SagaOption getSagaOption() { + return SagaOption.newBuilder().setParallelCompensation(false) + .setContinueWithError(true).build(); + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/SagaConsoleApp.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/SagaConsoleApp.java new file mode 100644 index 000000000..5a4c44e78 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/SagaConsoleApp.java @@ -0,0 +1,143 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.quickstarts.saga; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.quickstarts.saga.activities.DeliveryActivity; +import io.dapr.quickstarts.saga.activities.NotifyActivity; +import io.dapr.quickstarts.saga.activities.ProcessPaymentActivity; +import io.dapr.quickstarts.saga.activities.ProcessPaymentCompensationActivity; +import io.dapr.quickstarts.saga.activities.RequestApprovalActivity; +import io.dapr.quickstarts.saga.activities.ReserveInventoryActivity; +import io.dapr.quickstarts.saga.activities.UpdateInventoryActivity; +import io.dapr.quickstarts.saga.activities.UpdateInventoryCompensationActivity; +import io.dapr.quickstarts.saga.models.InventoryItem; +import io.dapr.quickstarts.saga.models.OrderPayload; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class SagaConsoleApp { + + private static final String STATE_STORE_NAME = "statestore"; + + /** + * The main method of this console app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + System.out.println("*** Welcome to the Dapr saga console app sample!"); + System.out.println("*** Using this app, you can place orders that start workflows."); + // Wait for the sidecar to become available + Thread.sleep(5 * 1000); + + // Register the OrderProcessingWorkflow and its activities with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class); + builder.registerActivity(NotifyActivity.class); + builder.registerActivity(ProcessPaymentActivity.class); + builder.registerActivity(RequestApprovalActivity.class); + builder.registerActivity(ReserveInventoryActivity.class); + builder.registerActivity(UpdateInventoryActivity.class); + builder.registerActivity(DeliveryActivity.class); + + builder.registerActivity(ProcessPaymentCompensationActivity.class); + builder.registerActivity(UpdateInventoryCompensationActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(false); + } + + InventoryItem inventory = prepareInventoryAndOrder(); + + DaprWorkflowClient workflowClient = new DaprWorkflowClient(); + try (workflowClient) { + executeWorkflow(workflowClient, inventory); + } + + } + + private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) { + System.out.println("==========Begin the purchase of item:=========="); + String itemName = inventory.getName(); + int orderQuantity = inventory.getQuantity(); + int totalcost = orderQuantity * inventory.getPerItemCost(); + OrderPayload order = new OrderPayload(); + order.setItemName(itemName); + order.setQuantity(orderQuantity); + order.setTotalCost(totalcost); + System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName); + + String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order); + System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n", + instanceId); + + try { + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + System.out.printf("workflow instance %s started%n", instanceId); + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId); + return; + } + + try { + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, + Duration.ofSeconds(30), + true); + if (workflowStatus != null) { + System.out.printf("workflow instance completed, out is: %s%n", + workflowStatus.getSerializedOutput()); + } else { + System.out.printf("workflow instance %s not found%n", instanceId); + } + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId); + } + + } + + private static InventoryItem prepareInventoryAndOrder() { + // prepare 100 cars in inventory + InventoryItem inventory = new InventoryItem(); + inventory.setName("cars"); + inventory.setPerItemCost(15000); + inventory.setQuantity(100); + + DaprClient daprClient = new DaprClientBuilder().build(); + try { + String key = inventory.getName(); + daprClient.saveState(STATE_STORE_NAME, key, inventory).block(); + } finally { + try{ + daprClient.close(); + } catch (Exception e) { + } + } + + // prepare the order for 10 cars + InventoryItem order = new InventoryItem(); + order.setName("cars"); + order.setPerItemCost(15000); + order.setQuantity(10); + return order; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/DeliveryActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/DeliveryActivity.java new file mode 100644 index 000000000..95113296a --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/DeliveryActivity.java @@ -0,0 +1,20 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class DeliveryActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(DeliveryActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + // in this quickstart, we assume that the Delivery will be failed + // So that the workflow will be failed and compensated + logger.info("Delivery failed"); + throw new RuntimeException("Delivery failed"); + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/NotifyActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/NotifyActivity.java new file mode 100644 index 000000000..7ee137321 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/NotifyActivity.java @@ -0,0 +1,21 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.saga.models.Notification; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class NotifyActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(NotifyActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + Notification notification = ctx.getInput(Notification.class); + logger.info(notification.getMessage()); + + return ""; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentActivity.java new file mode 100644 index 000000000..49d759fc7 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentActivity.java @@ -0,0 +1,28 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.saga.models.PaymentRequest; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class ProcessPaymentActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(ProcessPaymentActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + PaymentRequest req = ctx.getInput(PaymentRequest.class); + logger.info("Processing payment: {} for {} {} at ${}", + req.getRequestId(), req.getQuantity(), req.getItemBeingPurchased(), req.getAmount()); + + // Simulate slow processing + try { + Thread.sleep(7 * 1000); + } catch (InterruptedException e) { + } + logger.info("Payment for request ID '{}' processed successfully", req.getRequestId()); + + return true; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentCompensationActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentCompensationActivity.java new file mode 100644 index 000000000..8ef10e766 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ProcessPaymentCompensationActivity.java @@ -0,0 +1,30 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.saga.models.PaymentRequest; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class ProcessPaymentCompensationActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(ProcessPaymentCompensationActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + PaymentRequest input = ctx.getInput(PaymentRequest.class); + + logger.info("Compensating payment for request ID '{}' at ${}", + input.getRequestId(), input.getAmount()); + + // Simulate slow processing + try { + Thread.sleep(1 * 1000); + } catch (InterruptedException e) { + } + + logger.info("Compensated payment for request ID '{}' at ${}", + input.getRequestId(), input.getAmount()); + return null; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/RequestApprovalActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/RequestApprovalActivity.java new file mode 100644 index 000000000..640df12e7 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/RequestApprovalActivity.java @@ -0,0 +1,24 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.saga.models.ApprovalResult; +import io.dapr.quickstarts.saga.models.OrderPayload; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class RequestApprovalActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(RequestApprovalActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Requesting approval for order: {}", order); + + // hard code to Approved in example + logger.info("Approved requesting approval for order: {}", order); + return ApprovalResult.Approved; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ReserveInventoryActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ReserveInventoryActivity.java new file mode 100644 index 000000000..00dd69e83 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/ReserveInventoryActivity.java @@ -0,0 +1,62 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.State; +import io.dapr.quickstarts.saga.models.InventoryItem; +import io.dapr.quickstarts.saga.models.InventoryRequest; +import io.dapr.quickstarts.saga.models.InventoryResult; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class ReserveInventoryActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(ReserveInventoryActivity.class); + + private static final String STATE_STORE_NAME = "statestore"; + + @Override + public Object run(WorkflowActivityContext ctx) { + InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class); + logger.info("Reserving inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + State inventoryState; + try { + DaprClient daprClient = new DaprClientBuilder().build(); + inventoryState = daprClient.getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block(); + } catch (Exception e) { + throw e; + } + + InventoryItem inventory = inventoryState.getValue(); + logger.info("There are {} {} available for purchase", + inventory.getQuantity(), inventory.getName()); + + // See if there're enough items to purchase + if (inventory.getQuantity() >= inventoryRequest.getQuantity()) { + // Simulate slow processing + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + } + logger.info("Reserved inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(true); + inventoryResult.setInventoryItem(inventory); + return inventoryResult; + } + + // Not enough items. + logger.info("Not enough items to reserve inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(false); + inventoryResult.setInventoryItem(inventory); + return inventoryResult; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryActivity.java new file mode 100644 index 000000000..62c14f854 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryActivity.java @@ -0,0 +1,69 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.State; +import io.dapr.quickstarts.saga.models.InventoryItem; +import io.dapr.quickstarts.saga.models.InventoryRequest; +import io.dapr.quickstarts.saga.models.InventoryResult; +import io.dapr.quickstarts.saga.models.OrderPayload; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class UpdateInventoryActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(UpdateInventoryActivity.class); + + private static final String STATE_STORE_NAME = "statestore"; + + @Override + public Object run(WorkflowActivityContext ctx) { + InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class); + logger.info("Updating inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + // Simulate slow processing + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + } + + try { + DaprClient daprClient = new DaprClientBuilder().build(); + + // Determine if there are enough Items for purchase + State inventoryState = daprClient + .getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block(); + InventoryItem inventory = inventoryState.getValue(); + int newQuantity = inventory.getQuantity() - inventoryRequest.getQuantity(); + if (newQuantity < 0) { + logger.info("Not enough inventory for order '{}' of {} {}, there are only {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName(), + inventory.getQuantity(), inventory.getName()); + + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(false); + return inventoryResult; + } + + // Update the statestore with the new amount of paper clips + OrderPayload updatedOrderPayload = new OrderPayload(); + updatedOrderPayload.setItemName(inventoryRequest.getItemName()); + updatedOrderPayload.setQuantity(newQuantity); + daprClient.saveState(STATE_STORE_NAME, inventoryRequest.getItemName(), inventoryState.getEtag(), + updatedOrderPayload, null).block(); + + logger.info("Updated inventory for order '{}': there are now {} {} left in stock", + inventoryRequest.getRequestId(), newQuantity, inventoryRequest.getItemName()); + // in addition to print to std out for validation + System.out.println("there are now " + newQuantity + " " + inventoryRequest.getItemName() + " left in stock"); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(true); + return inventoryResult; + } catch (Exception e) { + throw e; + } + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryCompensationActivity.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryCompensationActivity.java new file mode 100644 index 000000000..64774a9a3 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/activities/UpdateInventoryCompensationActivity.java @@ -0,0 +1,54 @@ +package io.dapr.quickstarts.saga.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.State; +import io.dapr.quickstarts.saga.models.InventoryItem; +import io.dapr.quickstarts.saga.models.InventoryRequest; +import io.dapr.quickstarts.saga.models.OrderPayload; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class UpdateInventoryCompensationActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(UpdateInventoryCompensationActivity.class); + + private static final String STATE_STORE_NAME = "statestore"; + + @Override + public Object run(WorkflowActivityContext ctx) { + InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class); + + logger.info("Compensating inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + logger.info("Compensating inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + try { + DaprClient daprClient = new DaprClientBuilder().build(); + State inventoryState = daprClient + .getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block(); + InventoryItem inventory = inventoryState.getValue(); + int newQuantity = inventory.getQuantity() + inventoryRequest.getQuantity(); + + // Update the statestore with the new amount + OrderPayload updatedOrderPayload = new OrderPayload(); + updatedOrderPayload.setItemName(inventoryRequest.getItemName()); + updatedOrderPayload.setQuantity(newQuantity); + daprClient.saveState(STATE_STORE_NAME, inventoryRequest.getItemName(), inventoryState.getEtag(), + updatedOrderPayload, null).block(); + + logger.info("Compensated inventory for order '{}': there are now {} {} left in stock", + inventoryRequest.getRequestId(), newQuantity, inventoryRequest.getItemName()); + // in addition to print to std out for validation + System.out.println("there are now " + newQuantity + " " + inventoryRequest.getItemName() + " left in stock"); + return null; + } catch (Exception e) { + throw e; + } + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalRequired.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalRequired.java new file mode 100644 index 000000000..741cbc8f5 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalRequired.java @@ -0,0 +1,19 @@ +package io.dapr.quickstarts.saga.models; + +public class ApprovalRequired { + private boolean approval; + + public boolean isApproval() { + return approval; + } + + public void setApproval(boolean approval) { + this.approval = approval; + } + + @Override + public String toString() { + return "ApprovalRequired [approval=" + approval + "]"; + } + +} \ No newline at end of file diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalResult.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalResult.java new file mode 100644 index 000000000..cb2c997cc --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/ApprovalResult.java @@ -0,0 +1,7 @@ +package io.dapr.quickstarts.saga.models; + +public enum ApprovalResult { + Unspecified, + Approved, + Rejected +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryItem.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryItem.java new file mode 100644 index 000000000..73cea0a62 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryItem.java @@ -0,0 +1,36 @@ +package io.dapr.quickstarts.saga.models; + +public class InventoryItem { + private String name; + private int perItemCost; + private int quantity; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getPerItemCost() { + return perItemCost; + } + + public void setPerItemCost(int perItemCost) { + this.perItemCost = perItemCost; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "InventoryItem [name=" + name + ", perItemCost=" + perItemCost + ", quantity=" + quantity + "]"; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryRequest.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryRequest.java new file mode 100644 index 000000000..e129dd07f --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryRequest.java @@ -0,0 +1,37 @@ +package io.dapr.quickstarts.saga.models; + +public class InventoryRequest { + + private String requestId; + private String itemName; + private int quantity; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getItemName() { + return itemName; + } + + public void setItemName(String itemName) { + this.itemName = itemName; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "InventoryRequest [requestId=" + requestId + ", itemName=" + itemName + ", quantity=" + quantity + "]"; + } +} \ No newline at end of file diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryResult.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryResult.java new file mode 100644 index 000000000..a9c81dbcb --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/InventoryResult.java @@ -0,0 +1,27 @@ +package io.dapr.quickstarts.saga.models; + +public class InventoryResult { + private boolean success; + private InventoryItem inventoryItem; + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public InventoryItem getInventoryItem() { + return inventoryItem; + } + + public void setInventoryItem(InventoryItem inventoryItem) { + this.inventoryItem = inventoryItem; + } + + @Override + public String toString() { + return "InventoryResult [success=" + success + ", inventoryItem=" + inventoryItem + "]"; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/Notification.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/Notification.java new file mode 100644 index 000000000..46f0674d3 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/Notification.java @@ -0,0 +1,19 @@ +package io.dapr.quickstarts.saga.models; + +public class Notification { + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return "Notification [message=" + message + "]"; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderPayload.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderPayload.java new file mode 100644 index 000000000..f2b420ab4 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderPayload.java @@ -0,0 +1,38 @@ +package io.dapr.quickstarts.saga.models; + +public class OrderPayload { + + private String itemName; + private int totalCost; + private int quantity; + + public String getItemName() { + return itemName; + } + + public void setItemName(String itemName) { + this.itemName = itemName; + } + + public int getTotalCost() { + return totalCost; + } + + public void setTotalCost(int totalCost) { + this.totalCost = totalCost; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "OrderPayload [itemName=" + itemName + ", totalCost=" + totalCost + ", quantity=" + quantity + "]"; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderResult.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderResult.java new file mode 100644 index 000000000..a9ea527d1 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/OrderResult.java @@ -0,0 +1,27 @@ +package io.dapr.quickstarts.saga.models; + +public class OrderResult { + private boolean processed; + private boolean compensated; + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + public boolean isCompensated() { + return compensated; + } + + public void setCompensated(boolean compensated) { + this.compensated = compensated; + } + + @Override + public String toString() { + return "OrderResult [processed=" + processed + ", compensated=" + compensated + "]"; + } +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/PaymentRequest.java b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/PaymentRequest.java new file mode 100644 index 000000000..6c742a956 --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/java/io/dapr/quickstarts/saga/models/PaymentRequest.java @@ -0,0 +1,47 @@ +package io.dapr.quickstarts.saga.models; + +public class PaymentRequest { + private String requestId; + private String itemBeingPurchased; + private int amount; + private int quantity; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getItemBeingPurchased() { + return itemBeingPurchased; + } + + public void setItemBeingPurchased(String itemBeingPurchased) { + this.itemBeingPurchased = itemBeingPurchased; + } + + public int getAmount() { + return amount; + } + + public void setAmount(int amount) { + this.amount = amount; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "PaymentRequest [requestId=" + requestId + ", itemBeingPurchased=" + itemBeingPurchased + ", amount=" + amount + + ", quantity=" + quantity + "]"; + } + +} diff --git a/workflow-saga/java/sdk/order-processor-saga/src/main/resources/simplelogger.properties b/workflow-saga/java/sdk/order-processor-saga/src/main/resources/simplelogger.properties new file mode 100644 index 000000000..32ca7250a --- /dev/null +++ b/workflow-saga/java/sdk/order-processor-saga/src/main/resources/simplelogger.properties @@ -0,0 +1 @@ +org.slf4j.simpleLogger.logFile=System.out