diff --git a/workflows/java/sdk/README.md b/workflows/java/sdk/README.md new file mode 100644 index 000000000..a7272d05f --- /dev/null +++ b/workflows/java/sdk/README.md @@ -0,0 +1,108 @@ +# Dapr workflows + +In this quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and the workflow management API. The console app starts and manages the lifecycle of a workflow that stores and retrieves data in a state store. + +This quickstart includes one project: + +- Java console app `order-processor` + +The quickstart contains 1 workflow to simulate purchasing items from a store, and 5 unique activities within the workflow. These 5 activities are as follows: + +- NotifyActivity: This activity utilizes a logger to print out messages throughout the workflow. These messages notify the user when there is insufficient inventory, their payment couldn't be processed, and more. +- ReserveInventoryActivity: This activity checks the state store to ensure that there is enough inventory present for purchase. +- RequestApprovalActivity: This activity requests approval for orders over a certain threshold +- ProcessPaymentActivity: This activity is responsible for processing and authorizing the payment. +- UpdateInventoryActivity: This activity updates the state store with the new remaining inventory value. + +### Run the order processor workflow + +1. Open a new terminal window and navigate to `order-processor` directory: + + + +```bash +cd ./order-processor +mvn clean install +``` + + +2. Run the console app with Dapr: + + + +```bash +dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp +``` + + + +3. Expected output + + +``` +== APP == *** Welcome to the Dapr Workflow console app sample! +== APP == *** Using this app, you can place orders that start workflows. +== APP == Start workflow runtime +== APP == Sep 20, 2023 8:38:30 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. +== APP == ==========Begin the purchase of item:========== +== APP == Starting order workflow, purchasing 10 of cars +== APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: 95d33f7c-3af8-4960-ba11-4ecea83b0509 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): 95d33f7c-3af8-4960-ba11-4ecea83b0509 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T08:38:33.248Z +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == workflow instance 95d33f7c-3af8-4960-ba11-4ecea83b0509 started +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order '95d33f7c-3af8-4960-ba11-4ecea83b0509' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order '95d33f7c-3af8-4960-ba11-4ecea83b0509' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: 95d33f7c-3af8-4960-ba11-4ecea83b0509 for 10 cars at $150000 +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID '95d33f7c-3af8-4960-ba11-4ecea83b0509' processed successfully +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order '95d33f7c-3af8-4960-ba11-4ecea83b0509' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order '95d33f7c-3af8-4960-ba11-4ecea83b0509': there are now 90 cars left in stock +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : 95d33f7c-3af8-4960-ba11-4ecea83b0509 +== APP == workflow instance 95d33f7c-3af8-4960-ba11-4ecea83b0509 completed, out is: {"processed":true} +``` + +### View workflow output with Zipkin + +For a more detailed view of the workflow activities (duration, progress etc.), try using Zipkin. + +1. Launch Zipkin container - The [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) docker container is launched on running `dapr init`. Check to make sure the container is running. If it's not, launch the Zipkin docker container with the following command. + +```bash +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +2. View Traces in Zipkin UI - In your browser go to http://localhost:9411 to view the workflow trace spans in the Zipkin web UI. The order-processor workflow should be viewable with the following output in the Zipkin web UI. + + + +### What happened? + +When you ran `dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp` + +1. A unique order ID for the workflow is generated (in the above example, `95d33f7c-3af8-4960-ba11-4ecea83b0509`) and the workflow is scheduled. +2. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +3. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +4. Your workflow starts and notifies you of its status. +5. The `RequestApprovalActivity` workflow activity requests approval for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` +6. The `ProcessPaymentActivity` workflow activity begins processing payment for order `95d33f7c-3af8-4960-ba11-4ecea83b0509` and confirms if successful. +7. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +8. The `NotifyActivity` workflow activity sends a notification saying that order `95d33f7c-3af8-4960-ba11-4ecea83b0509` has completed and processed. +9. The workflow terminates as completed and processed. + diff --git a/workflows/java/sdk/makefile b/workflows/java/sdk/makefile new file mode 100644 index 000000000..e7a8826bf --- /dev/null +++ b/workflows/java/sdk/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file diff --git a/workflows/java/sdk/order-processor/pom.xml b/workflows/java/sdk/order-processor/pom.xml new file mode 100644 index 000000000..c939c200e --- /dev/null +++ b/workflows/java/sdk/order-processor/pom.xml @@ -0,0 +1,51 @@ + + + 4.0.0 + + com.service + OrderProcessingService + 0.0.1-SNAPSHOT + OrderProcessingService + Demo for Dapr workflow component + + 11 + 11 + 1.6.1 + + + + io.dapr + dapr-sdk-workflows + 0.10.0-SNAPSHOT + + + org.slf4j + slf4j-simple + 1.7.36 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.6.3 + + + + repackage + + + + io.dapr.quickstarts.workflows.WorkflowConsoleApp + + + + + + + + + diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/OrderProcessingWorkflow.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/OrderProcessingWorkflow.java new file mode 100644 index 000000000..79f25c4ad --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/OrderProcessingWorkflow.java @@ -0,0 +1,111 @@ +package io.dapr.quickstarts.workflows; + +import org.slf4j.Logger; + +import io.dapr.quickstarts.workflows.activities.NotifyActivity; +import io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity; +import io.dapr.quickstarts.workflows.activities.RequestApprovalActivity; +import io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity; +import io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity; +import io.dapr.quickstarts.workflows.models.ApprovalResult; +import io.dapr.quickstarts.workflows.models.InventoryRequest; +import io.dapr.quickstarts.workflows.models.InventoryResult; +import io.dapr.quickstarts.workflows.models.Notification; +import io.dapr.quickstarts.workflows.models.OrderPayload; +import io.dapr.quickstarts.workflows.models.OrderResult; +import io.dapr.quickstarts.workflows.models.PaymentRequest; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +public class OrderProcessingWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String orderId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID(order ID): " + orderId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Received Order: " + order.toString()); + OrderResult orderResult = new OrderResult(); + orderResult.setProcessed(false); + + // Notify the user that an order has come through + Notification notification = new Notification(); + notification.setMessage("Received Order: " + order.toString()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Determine if there is enough of the item available for purchase by checking + // the inventory + InventoryRequest inventoryRequest = new InventoryRequest(); + inventoryRequest.setRequestId(orderId); + inventoryRequest.setItemName(order.getItemName()); + inventoryRequest.setQuantity(order.getQuantity()); + InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + + // If there is insufficient inventory, fail and let the user know + if (!inventoryResult.isSuccess()) { + notification.setMessage("Insufficient inventory for order : " + order.getItemName()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Require orders over a certain threshold to be approved + if (order.getTotalCost() > 5000) { + ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), + order, ApprovalResult.class).await(); + if (approvalResult != ApprovalResult.Approved) { + notification.setMessage("Order " + order.getItemName() + " was not approved."); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + } + + // There is enough inventory available so the user can purchase the item(s). + // Process their payment + PaymentRequest paymentRequest = new PaymentRequest(); + paymentRequest.setRequestId(orderId); + paymentRequest.setItemBeingPurchased(order.getItemName()); + paymentRequest.setQuantity(order.getQuantity()); + paymentRequest.setAmount(order.getTotalCost()); + boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), + paymentRequest, boolean.class).await(); + if (!isOK) { + notification.setMessage("Payment failed for order : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + if (!inventoryResult.isSuccess()) { + // If there is an error updating the inventory, refund the user + // paymentRequest.setAmount(-1 * paymentRequest.getAmount()); + // ctx.callActivity(ProcessPaymentActivity.class.getName(), + // paymentRequest).await(); + + // Let users know their payment processing failed + notification.setMessage("Order failed to update inventory! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Let user know their order was processed + notification.setMessage("Order completed! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Complete the workflow with order result is processed + orderResult.setProcessed(true); + ctx.complete(orderResult); + }; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/WorkflowConsoleApp.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/WorkflowConsoleApp.java new file mode 100644 index 000000000..041938124 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/WorkflowConsoleApp.java @@ -0,0 +1,132 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.quickstarts.workflows; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.quickstarts.workflows.activities.NotifyActivity; +import io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity; +import io.dapr.quickstarts.workflows.activities.RequestApprovalActivity; +import io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity; +import io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity; +import io.dapr.quickstarts.workflows.models.InventoryItem; +import io.dapr.quickstarts.workflows.models.OrderPayload; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class WorkflowConsoleApp { + + private static final String STATE_STORE_NAME = "statestore-actors"; + + /** + * The main method of this console app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + System.out.println("*** Welcome to the Dapr Workflow console app sample!"); + System.out.println("*** Using this app, you can place orders that start workflows."); + // Wait for the sidecar to become available + Thread.sleep(5 * 1000); + + // Register the OrderProcessingWorkflow and its activities with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class); + builder.registerActivity(NotifyActivity.class); + builder.registerActivity(ProcessPaymentActivity.class); + builder.registerActivity(RequestApprovalActivity.class); + builder.registerActivity(ReserveInventoryActivity.class); + builder.registerActivity(UpdateInventoryActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(false); + } + + InventoryItem inventory = prepareInventoryAndOrder(); + + DaprWorkflowClient workflowClient = new DaprWorkflowClient(); + try (workflowClient) { + executeWorkflow(workflowClient, inventory); + } + + } + + private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) { + System.out.println("==========Begin the purchase of item:=========="); + String itemName = inventory.getName(); + int orderQuantity = inventory.getQuantity(); + int totalcost = orderQuantity * inventory.getPerItemCost(); + OrderPayload order = new OrderPayload(); + order.setItemName(itemName); + order.setQuantity(orderQuantity); + order.setTotalCost(totalcost); + System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName); + + String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order); + System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n", + instanceId); + + try { + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + System.out.printf("workflow instance %s started%n", instanceId); + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId); + return; + } + + try { + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, + Duration.ofSeconds(30), + true); + if (workflowStatus != null) { + System.out.printf("workflow instance %s completed, out is: %s %n", instanceId, + workflowStatus.getSerializedOutput()); + } else { + System.out.printf("workflow instance %s not found%n", instanceId); + } + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId); + } + + } + + private static InventoryItem prepareInventoryAndOrder() { + // prepare 100 cars in inventory + InventoryItem inventory = new InventoryItem(); + inventory.setName("cars"); + inventory.setPerItemCost(15000); + inventory.setQuantity(100); + DaprClient daprClient = new DaprClientBuilder().build(); + restockInventory(daprClient, inventory); + + // prepare order for 10 cars + InventoryItem order = new InventoryItem(); + order.setName("cars"); + order.setPerItemCost(15000); + order.setQuantity(10); + return order; + } + + private static void restockInventory(DaprClient daprClient, InventoryItem inventory) { + String key = inventory.getName(); + daprClient.saveState(STATE_STORE_NAME, key, inventory).block(); + } +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java new file mode 100644 index 000000000..7c4f71bc7 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java @@ -0,0 +1,21 @@ +package io.dapr.quickstarts.workflows.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.workflows.models.Notification; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class NotifyActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(NotifyActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + Notification notification = ctx.getInput(Notification.class); + logger.info(notification.getMessage()); + + return ""; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java new file mode 100644 index 000000000..17dbb8373 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java @@ -0,0 +1,29 @@ +package io.dapr.quickstarts.workflows.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.workflows.models.PaymentRequest; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class ProcessPaymentActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(ProcessPaymentActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + PaymentRequest req = ctx.getInput(PaymentRequest.class); + logger.info("Processing payment: {} for {} {} at ${}", + req.getRequestId(), req.getQuantity(), req.getItemBeingPurchased(), req.getAmount()); + + // Simulate slow processing + try { + Thread.sleep(7 * 1000); + } catch (InterruptedException e) { + } + logger.info("Payment for request ID '{}' processed successfully", req.getRequestId()); + + return true; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java new file mode 100644 index 000000000..1e24ddd9e --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java @@ -0,0 +1,24 @@ +package io.dapr.quickstarts.workflows.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.quickstarts.workflows.models.ApprovalResult; +import io.dapr.quickstarts.workflows.models.OrderPayload; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class RequestApprovalActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(RequestApprovalActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Requesting approval for order: {}", order); + + // hard code to Approved in example + logger.info("Approved requesting approval for order: {}", order); + return ApprovalResult.Approved; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java new file mode 100644 index 000000000..1f0c4292a --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java @@ -0,0 +1,62 @@ +package io.dapr.quickstarts.workflows.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.State; +import io.dapr.quickstarts.workflows.models.InventoryItem; +import io.dapr.quickstarts.workflows.models.InventoryRequest; +import io.dapr.quickstarts.workflows.models.InventoryResult; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class ReserveInventoryActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(ReserveInventoryActivity.class); + + private static final String STATE_STORE_NAME = "statestore-actors"; + + private DaprClient daprClient; + + public ReserveInventoryActivity() { + this.daprClient = new DaprClientBuilder().build(); + } + + @Override + public Object run(WorkflowActivityContext ctx) { + InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class); + logger.info("Reserving inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + State inventoryState = daprClient.getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block(); + InventoryItem inventory = inventoryState.getValue(); + + logger.info("There are {} {} available for purchase", + inventory.getQuantity(), inventory.getName()); + + // See if there're enough items to purchase + if (inventory.getQuantity() >= inventoryRequest.getQuantity()) { + // Simulate slow processing + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + } + logger.info("Reserved inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(true); + inventoryResult.setInventoryItem(inventory); + return inventoryResult; + } + + // Not enough items. + logger.info("Not enough items to reserve inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(false); + inventoryResult.setInventoryItem(inventory); + return inventoryResult; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java new file mode 100644 index 000000000..6392fb43b --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java @@ -0,0 +1,68 @@ +package io.dapr.quickstarts.workflows.activities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.State; +import io.dapr.quickstarts.workflows.models.InventoryItem; +import io.dapr.quickstarts.workflows.models.InventoryRequest; +import io.dapr.quickstarts.workflows.models.InventoryResult; +import io.dapr.quickstarts.workflows.models.OrderPayload; +import io.dapr.workflows.runtime.WorkflowActivity; +import io.dapr.workflows.runtime.WorkflowActivityContext; + +public class UpdateInventoryActivity implements WorkflowActivity { + private static Logger logger = LoggerFactory.getLogger(UpdateInventoryActivity.class); + + private static final String STATE_STORE_NAME = "statestore-actors"; + + private DaprClient daprClient; + + public UpdateInventoryActivity() { + this.daprClient = new DaprClientBuilder().build(); + } + + @Override + public Object run(WorkflowActivityContext ctx) { + InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class); + logger.info("Updating inventory for order '{}' of {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName()); + + // Simulate slow processing + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + } + + // Determine if there are enough Items for purchase + State inventoryState = daprClient + .getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block(); + InventoryItem inventory = inventoryState.getValue(); + int newQuantity = inventory.getQuantity() - inventoryRequest.getQuantity(); + if (newQuantity < 0) { + logger.info("Not enough inventory for order '{}' of {} {}, there are only {} {}", + inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName(), + inventory.getQuantity(), inventory.getName()); + + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(false); + return inventoryResult; + } + + // Update the statestore with the new amount of paper clips + OrderPayload updatedOrderPayload = new OrderPayload(); + updatedOrderPayload.setItemName(inventoryRequest.getItemName()); + updatedOrderPayload.setQuantity(newQuantity); + daprClient.saveState(STATE_STORE_NAME, inventoryRequest.getItemName(), inventoryState.getEtag(), + updatedOrderPayload, null).block(); + + logger.info("Updated inventory for order '{}': there are now {} {} left in stock", + inventoryRequest.getRequestId(), newQuantity, inventoryRequest.getItemName()); + InventoryResult inventoryResult = new InventoryResult(); + inventoryResult.setSuccess(true); + return inventoryResult; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalRequired.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalRequired.java new file mode 100644 index 000000000..4ee5a45f7 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalRequired.java @@ -0,0 +1,19 @@ +package io.dapr.quickstarts.workflows.models; + +public class ApprovalRequired { + private boolean approval; + + public boolean isApproval() { + return approval; + } + + public void setApproval(boolean approval) { + this.approval = approval; + } + + @Override + public String toString() { + return "ApprovalRequired [approval=" + approval + "]"; + } + +} \ No newline at end of file diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalResult.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalResult.java new file mode 100644 index 000000000..71ea7d5da --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/ApprovalResult.java @@ -0,0 +1,7 @@ +package io.dapr.quickstarts.workflows.models; + +public enum ApprovalResult { + Unspecified, + Approved, + Rejected +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryItem.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryItem.java new file mode 100644 index 000000000..62261a3e7 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryItem.java @@ -0,0 +1,36 @@ +package io.dapr.quickstarts.workflows.models; + +public class InventoryItem { + private String name; + private int perItemCost; + private int quantity; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getPerItemCost() { + return perItemCost; + } + + public void setPerItemCost(int perItemCost) { + this.perItemCost = perItemCost; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "InventoryItem [name=" + name + ", perItemCost=" + perItemCost + ", quantity=" + quantity + "]"; + } +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryRequest.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryRequest.java new file mode 100644 index 000000000..cee70d6ab --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryRequest.java @@ -0,0 +1,37 @@ +package io.dapr.quickstarts.workflows.models; + +public class InventoryRequest { + + private String requestId; + private String itemName; + private int quantity; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getItemName() { + return itemName; + } + + public void setItemName(String itemName) { + this.itemName = itemName; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "InventoryRequest [requestId=" + requestId + ", itemName=" + itemName + ", quantity=" + quantity + "]"; + } +} \ No newline at end of file diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryResult.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryResult.java new file mode 100644 index 000000000..249bd9b1d --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/InventoryResult.java @@ -0,0 +1,27 @@ +package io.dapr.quickstarts.workflows.models; + +public class InventoryResult { + private boolean success; + private InventoryItem inventoryItem; + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public InventoryItem getInventoryItem() { + return inventoryItem; + } + + public void setInventoryItem(InventoryItem inventoryItem) { + this.inventoryItem = inventoryItem; + } + + @Override + public String toString() { + return "InventoryResult [success=" + success + ", inventoryItem=" + inventoryItem + "]"; + } +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/Notification.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/Notification.java new file mode 100644 index 000000000..d77f9d186 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/Notification.java @@ -0,0 +1,19 @@ +package io.dapr.quickstarts.workflows.models; + +public class Notification { + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return "Notification [message=" + message + "]"; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderPayload.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderPayload.java new file mode 100644 index 000000000..01f748519 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderPayload.java @@ -0,0 +1,38 @@ +package io.dapr.quickstarts.workflows.models; + +public class OrderPayload { + + private String itemName; + private int totalCost; + private int quantity; + + public String getItemName() { + return itemName; + } + + public void setItemName(String itemName) { + this.itemName = itemName; + } + + public int getTotalCost() { + return totalCost; + } + + public void setTotalCost(int totalCost) { + this.totalCost = totalCost; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "OrderPayload [itemName=" + itemName + ", totalCost=" + totalCost + ", quantity=" + quantity + "]"; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderResult.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderResult.java new file mode 100644 index 000000000..a32d12137 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/OrderResult.java @@ -0,0 +1,19 @@ +package io.dapr.quickstarts.workflows.models; + +public class OrderResult { + private boolean processed; + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + @Override + public String toString() { + return "OrderResult [processed=" + processed + "]"; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/PaymentRequest.java b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/PaymentRequest.java new file mode 100644 index 000000000..350e354c2 --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/models/PaymentRequest.java @@ -0,0 +1,47 @@ +package io.dapr.quickstarts.workflows.models; + +public class PaymentRequest { + private String requestId; + private String itemBeingPurchased; + private int amount; + private int quantity; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getItemBeingPurchased() { + return itemBeingPurchased; + } + + public void setItemBeingPurchased(String itemBeingPurchased) { + this.itemBeingPurchased = itemBeingPurchased; + } + + public int getAmount() { + return amount; + } + + public void setAmount(int amount) { + this.amount = amount; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "PaymentRequest [requestId=" + requestId + ", itemBeingPurchased=" + itemBeingPurchased + ", amount=" + amount + + ", quantity=" + quantity + "]"; + } + +} diff --git a/workflows/java/sdk/order-processor/src/main/resources/simplelogger.properties b/workflows/java/sdk/order-processor/src/main/resources/simplelogger.properties new file mode 100644 index 000000000..32ca7250a --- /dev/null +++ b/workflows/java/sdk/order-processor/src/main/resources/simplelogger.properties @@ -0,0 +1 @@ +org.slf4j.simpleLogger.logFile=System.out