diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index d139561cb04..bec440e1d81 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -21,7 +21,245 @@ In this guide, you'll: -{{< tabs ".NET" "Python" >}} +{{< tabs "Python" ".NET" "Java" >}} + + +{{% codetab %}} + +The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: + +- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: + - You have insufficient inventory + - Your payment couldn't be processed, etc. +- `process_payment_activity`: Processes and authorizes the payment. +- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. +- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. +- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. + +### Step 1: Pre-requisites + +For this example, you will need: + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). +- [Python 3.7+ installed](https://www.python.org/downloads/). + +- [Docker Desktop](https://www.docker.com/products/docker-desktop) + + +### Step 2: Set up the environment + +Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows). + +```bash +git clone https://github.com/dapr/quickstarts.git +``` + +In a new terminal window, navigate to the `order-processor` directory: + +```bash +cd workflows/python/sdk/order-processor +``` + +Install the Dapr Python SDK package: + +```bash +pip3 install -r requirements.txt +``` + +### Step 3: Run the order processor app + +In the terminal, start the order processor app alongside a Dapr sidecar: + +```bash +dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +``` + +> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. + +This starts the `order-processor` app with unique workflow ID and runs the workflow activities. + +Expected output: + +```bash +== APP == Starting order workflow, purchasing 10 of cars +== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... +== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! +== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars +== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase +== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars +== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval +== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! +== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD +== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully +== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars +== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock +== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! +== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED +== APP == Workflow completed! Result: Completed +== APP == Purchase of item is Completed +``` + +### (Optional) Step 4: View in Zipkin + +If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + + +### What happened? + +When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: + +1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +1. Your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The workflow terminates as completed. + +#### `order-processor/app.py` + +In the application's program file: +- The unique workflow order ID is generated +- The workflow is scheduled +- The workflow status is retrieved +- The workflow and the workflow activities it invokes are registered + +```python +class WorkflowConsoleApp: + def main(self): + # Register workflow and activities + workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) + workflowRuntime.register_workflow(order_processing_workflow) + workflowRuntime.register_activity(notify_activity) + workflowRuntime.register_activity(requst_approval_activity) + workflowRuntime.register_activity(verify_inventory_activity) + workflowRuntime.register_activity(process_payment_activity) + workflowRuntime.register_activity(update_inventory_activity) + workflowRuntime.start() + + print("==========Begin the purchase of item:==========", flush=True) + item_name = default_item_name + order_quantity = 10 + + total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost + order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + + # Start Workflow + print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) + start_resp = daprClient.start_workflow(workflow_component=workflow_component, + workflow_name=workflow_name, + input=order) + _id = start_resp.instance_id + + def prompt_for_approval(daprClient: DaprClient): + daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, + event_name="manager_approval", event_data={'approval': True}) + + approval_seeked = False + start_time = datetime.now() + while True: + time_delta = datetime.now() - start_time + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == "Completed" or\ + state.runtime_status == "Failed" or\ + state.runtime_status == "Terminated": + print(f'Workflow completed! Result: {state.runtime_status}', flush=True) + break + if time_delta.total_seconds() >= 10: + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if total_cost > 50000 and ( + state.runtime_status != "Completed" or + state.runtime_status != "Failed" or + state.runtime_status != "Terminated" + ) and not approval_seeked: + approval_seeked = True + threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() + + print("Purchase of item is ", state.runtime_status, flush=True) + + def restock_inventory(self, daprClient: DaprClient, baseInventory): + for key, item in baseInventory.items(): + print(f'item: {item}') + item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ + "per_item_cost": {item.per_item_cost}}}' + daprClient.save_state("statestore-actors", key, item_str) + +if __name__ == '__main__': + app = WorkflowConsoleApp() + app.main() +``` + +#### `order-processor/workflow.py` + +In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```python + def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): + """Defines the order processing workflow. + When the order is received, the inventory is checked to see if there is enough inventory to + fulfill the order. If there is enough inventory, the payment is processed and the inventory is + updated. If there is not enough inventory, the order is rejected. + If the total order is greater than $50,000, the order is sent to a manager for approval. + """ + order_id = ctx.instance_id + order_payload=json.loads(order_payload_str) + yield ctx.call_activity(notify_activity, + input=Notification(message=('Received order ' +order_id+ ' for ' + +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' + +' at $'+f'{order_payload["total_cost"]}' +' !'))) + result = yield ctx.call_activity(verify_inventory_activity, + input=InventoryRequest(request_id=order_id, + item_name=order_payload["item_name"], + quantity=order_payload["quantity"])) + if not result.success: + yield ctx.call_activity(notify_activity, + input=Notification(message='Insufficient inventory for ' + +f'{order_payload["item_name"]}'+'!')) + return OrderResult(processed=False) + + if order_payload["total_cost"] > 50000: + yield ctx.call_activity(requst_approval_activity, input=order_payload) + approval_task = ctx.wait_for_external_event("manager_approval") + timeout_event = ctx.create_timer(timedelta(seconds=200)) + winner = yield when_any([approval_task, timeout_event]) + if winner == timeout_event: + yield ctx.call_activity(notify_activity, + input=Notification(message='Payment for order '+order_id + +' has been cancelled due to timeout!')) + return OrderResult(processed=False) + approval_result = yield approval_task + if approval_result["approval"]: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been approved!')) + else: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been rejected!')) + return OrderResult(processed=False) + + yield ctx.call_activity(process_payment_activity, input=PaymentRequest( + request_id=order_id, item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], quantity=order_payload["quantity"])) + + try: + yield ctx.call_activity(update_inventory_activity, + input=PaymentRequest(request_id=order_id, + item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], + quantity=order_payload["quantity"])) + except Exception: + yield ctx.call_activity(notify_activity, + input=Notification(message=f'Order {order_id} Failed!')) + return OrderResult(processed=False) + + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Order {order_id} has completed!')) + return OrderResult(processed=True) +``` +{{% /codetab %}} {{% codetab %}} @@ -254,27 +492,35 @@ The `Activities` directory holds the four workflow activities used by the workfl - `ProcessPaymentActivity.cs` - `UpdateInventoryActivity.cs` +## Watch the demo + +Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): + + + {{% /codetab %}} - + {{% codetab %}} -The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: +The `order-processor` console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks: +- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow +- `RequestApprovalActivity`: Requests approval for processing payment +- `ReserveInventoryActivity`: Checks the state store to ensure that there is enough inventory for the purchase +- `ProcessPaymentActivity`: Processes and authorizes the payment +- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value -- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: - - You have insufficient inventory - - Your payment couldn't be processed, etc. -- `process_payment_activity`: Processes and authorizes the payment. -- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. -- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. -- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. ### Step 1: Pre-requisites For this example, you will need: - [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). -- [Python 3.7+ installed](https://www.python.org/downloads/). +- Java JDK 11 (or greater): + - [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + - [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + - [OpenJDK 11](https://jdk.java.net/11/) +- [Apache Maven](https://maven.apache.org/install.html) version 3.x. - [Docker Desktop](https://www.docker.com/products/docker-desktop) @@ -287,16 +533,16 @@ Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quic git clone https://github.com/dapr/quickstarts.git ``` -In a new terminal window, navigate to the `order-processor` directory: +Navigate to the `order-processor` directory: ```bash -cd workflows/python/sdk/order-processor +cd workflows/java/sdk/order-processor ``` -Install the Dapr Python SDK package: +Install the dependencies: ```bash -pip3 install -r requirements.txt +mvn clean install ``` ### Step 3: Run the order processor app @@ -304,54 +550,72 @@ pip3 install -r requirements.txt In the terminal, start the order processor app alongside a Dapr sidecar: ```bash -dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp ``` -> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. - This starts the `order-processor` app with unique workflow ID and runs the workflow activities. Expected output: -```bash +``` +== APP == *** Welcome to the Dapr Workflow console app sample! +== APP == *** Using this app, you can place orders that start workflows. +== APP == Start workflow runtime +== APP == Sep 20, 2023 3:23:05 PM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. + +== APP == ==========Begin the purchase of item:========== == APP == Starting order workflow, purchasing 10 of cars -== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... -== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! -== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars -== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase -== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars -== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval -== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! -== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD -== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully -== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars -== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock -== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! -== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED -== APP == Workflow completed! Result: Completed -== APP == Purchase of item is Completed + +== APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T19:23:09.755Z +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 started +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: edceba90-9c45-4be8-ad40-60d16e060797 for 10 cars at $150000 +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'edceba90-9c45-4be8-ad40-60d16e060797' processed successfully +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797': there are now 90 cars left in stock +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : edceba90-9c45-4be8-ad40-60d16e060797 + +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 completed, out is: {"processed":true} ``` ### (Optional) Step 4: View in Zipkin -If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). +If you have Zipkin configured for Dapr locally on your machine, you can - +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: + +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + ### What happened? -When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: +When you ran `dapr run`: -1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. A unique order ID for the workflow is generated (in the above example, `edceba90-9c45-4be8-ad40-60d16e060797`) and the workflow is scheduled. 1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. 1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. -1. Your workflow starts and notifies you of its status. -1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. Once approved, your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `edceba90-9c45-4be8-ad40-60d16e060797` and confirms if successful. 1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. -1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `edceba90-9c45-4be8-ad40-60d16e060797` has completed. 1. The workflow terminates as completed. -#### `order-processor/app.py` +#### `order-processor/WorkflowConsoleApp.java` In the application's program file: - The unique workflow order ID is generated @@ -359,151 +623,227 @@ In the application's program file: - The workflow status is retrieved - The workflow and the workflow activities it invokes are registered -```python -class WorkflowConsoleApp: - def main(self): - # Register workflow and activities - workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) - workflowRuntime.register_workflow(order_processing_workflow) - workflowRuntime.register_activity(notify_activity) - workflowRuntime.register_activity(requst_approval_activity) - workflowRuntime.register_activity(verify_inventory_activity) - workflowRuntime.register_activity(process_payment_activity) - workflowRuntime.register_activity(update_inventory_activity) - workflowRuntime.start() - - print("==========Begin the purchase of item:==========", flush=True) - item_name = default_item_name - order_quantity = 10 +```java +package io.dapr.quickstarts.workflows; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.workflows.client.DaprWorkflowClient; + +public class WorkflowConsoleApp { + + private static final String STATE_STORE_NAME = "statestore-actors"; + + // ... + public static void main(String[] args) throws Exception { + System.out.println("*** Welcome to the Dapr Workflow console app sample!"); + System.out.println("*** Using this app, you can place orders that start workflows."); + // Wait for the sidecar to become available + Thread.sleep(5 * 1000); + + // Register the OrderProcessingWorkflow and its activities with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class); + builder.registerActivity(NotifyActivity.class); + builder.registerActivity(ProcessPaymentActivity.class); + builder.registerActivity(RequestApprovalActivity.class); + builder.registerActivity(ReserveInventoryActivity.class); + builder.registerActivity(UpdateInventoryActivity.class); + + // Build the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(false); + } - total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost - order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + InventoryItem inventory = prepareInventoryAndOrder(); - # Start Workflow - print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) - start_resp = daprClient.start_workflow(workflow_component=workflow_component, - workflow_name=workflow_name, - input=order) - _id = start_resp.instance_id - - def prompt_for_approval(daprClient: DaprClient): - daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, - event_name="manager_approval", event_data={'approval': True}) + DaprWorkflowClient workflowClient = new DaprWorkflowClient(); + try (workflowClient) { + executeWorkflow(workflowClient, inventory); + } - approval_seeked = False - start_time = datetime.now() - while True: - time_delta = datetime.now() - start_time - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if not state: - print("Workflow not found!") # not expected - elif state.runtime_status == "Completed" or\ - state.runtime_status == "Failed" or\ - state.runtime_status == "Terminated": - print(f'Workflow completed! Result: {state.runtime_status}', flush=True) - break - if time_delta.total_seconds() >= 10: - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if total_cost > 50000 and ( - state.runtime_status != "Completed" or - state.runtime_status != "Failed" or - state.runtime_status != "Terminated" - ) and not approval_seeked: - approval_seeked = True - threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() - - print("Purchase of item is ", state.runtime_status, flush=True) + } + + // Start the workflow runtime, pulling and executing tasks + private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) { + System.out.println("==========Begin the purchase of item:=========="); + String itemName = inventory.getName(); + int orderQuantity = inventory.getQuantity(); + int totalcost = orderQuantity * inventory.getPerItemCost(); + OrderPayload order = new OrderPayload(); + order.setItemName(itemName); + order.setQuantity(orderQuantity); + order.setTotalCost(totalcost); + System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName); + + String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order); + System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n", + instanceId); + + // Check workflow instance start status + try { + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + System.out.printf("workflow instance %s started%n", instanceId); + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId); + return; + } - def restock_inventory(self, daprClient: DaprClient, baseInventory): - for key, item in baseInventory.items(): - print(f'item: {item}') - item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ - "per_item_cost": {item.per_item_cost}}}' - daprClient.save_state("statestore-actors", key, item_str) + // Check workflow instance complete status + try { + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, + Duration.ofSeconds(30), + true); + if (workflowStatus != null) { + System.out.printf("workflow instance %s completed, out is: %s %n", instanceId, + workflowStatus.getSerializedOutput()); + } else { + System.out.printf("workflow instance %s not found%n", instanceId); + } + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId); + } -if __name__ == '__main__': - app = WorkflowConsoleApp() - app.main() + } + + private static InventoryItem prepareInventoryAndOrder() { + // prepare 100 cars in inventory + InventoryItem inventory = new InventoryItem(); + inventory.setName("cars"); + inventory.setPerItemCost(15000); + inventory.setQuantity(100); + DaprClient daprClient = new DaprClientBuilder().build(); + restockInventory(daprClient, inventory); + + // prepare order for 10 cars + InventoryItem order = new InventoryItem(); + order.setName("cars"); + order.setPerItemCost(15000); + order.setQuantity(10); + return order; + } + + private static void restockInventory(DaprClient daprClient, InventoryItem inventory) { + String key = inventory.getName(); + daprClient.saveState(STATE_STORE_NAME, key, inventory).block(); + } +} ``` -#### `order-processor/workflow.py` +#### `OrderProcessingWorkflow.java` + +In `OrderProcessingWorkflow.java`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```java +package io.dapr.quickstarts.workflows; +import io.dapr.workflows.Workflow; + +public class OrderProcessingWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String orderId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID(order ID): " + orderId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Received Order: " + order.toString()); + OrderResult orderResult = new OrderResult(); + orderResult.setProcessed(false); + + // Notify the user that an order has come through + Notification notification = new Notification(); + notification.setMessage("Received Order: " + order.toString()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Determine if there is enough of the item available for purchase by checking + // the inventory + InventoryRequest inventoryRequest = new InventoryRequest(); + inventoryRequest.setRequestId(orderId); + inventoryRequest.setItemName(order.getItemName()); + inventoryRequest.setQuantity(order.getQuantity()); + InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + + // If there is insufficient inventory, fail and let the user know + if (!inventoryResult.isSuccess()) { + notification.setMessage("Insufficient inventory for order : " + order.getItemName()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Require orders over a certain threshold to be approved + if (order.getTotalCost() > 5000) { + ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), + order, ApprovalResult.class).await(); + if (approvalResult != ApprovalResult.Approved) { + notification.setMessage("Order " + order.getItemName() + " was not approved."); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + } + + // There is enough inventory available so the user can purchase the item(s). + // Process their payment + PaymentRequest paymentRequest = new PaymentRequest(); + paymentRequest.setRequestId(orderId); + paymentRequest.setItemBeingPurchased(order.getItemName()); + paymentRequest.setQuantity(order.getQuantity()); + paymentRequest.setAmount(order.getTotalCost()); + boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), + paymentRequest, boolean.class).await(); + if (!isOK) { + notification.setMessage("Payment failed for order : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + if (!inventoryResult.isSuccess()) { + // If there is an error updating the inventory, refund the user + // paymentRequest.setAmount(-1 * paymentRequest.getAmount()); + // ctx.callActivity(ProcessPaymentActivity.class.getName(), + // paymentRequest).await(); + + // Let users know their payment processing failed + notification.setMessage("Order failed to update inventory! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Let user know their order was processed + notification.setMessage("Order completed! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Complete the workflow with order result is processed + orderResult.setProcessed(true); + ctx.complete(orderResult); + }; + } + +} +``` -In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). +#### `activities` directory -```python - def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): - """Defines the order processing workflow. - When the order is received, the inventory is checked to see if there is enough inventory to - fulfill the order. If there is enough inventory, the payment is processed and the inventory is - updated. If there is not enough inventory, the order is rejected. - If the total order is greater than $50,000, the order is sent to a manager for approval. - """ - order_id = ctx.instance_id - order_payload=json.loads(order_payload_str) - yield ctx.call_activity(notify_activity, - input=Notification(message=('Received order ' +order_id+ ' for ' - +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' - +' at $'+f'{order_payload["total_cost"]}' +' !'))) - result = yield ctx.call_activity(verify_inventory_activity, - input=InventoryRequest(request_id=order_id, - item_name=order_payload["item_name"], - quantity=order_payload["quantity"])) - if not result.success: - yield ctx.call_activity(notify_activity, - input=Notification(message='Insufficient inventory for ' - +f'{order_payload["item_name"]}'+'!')) - return OrderResult(processed=False) - - if order_payload["total_cost"] > 50000: - yield ctx.call_activity(requst_approval_activity, input=order_payload) - approval_task = ctx.wait_for_external_event("manager_approval") - timeout_event = ctx.create_timer(timedelta(seconds=200)) - winner = yield when_any([approval_task, timeout_event]) - if winner == timeout_event: - yield ctx.call_activity(notify_activity, - input=Notification(message='Payment for order '+order_id - +' has been cancelled due to timeout!')) - return OrderResult(processed=False) - approval_result = yield approval_task - if approval_result["approval"]: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been approved!')) - else: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been rejected!')) - return OrderResult(processed=False) - - yield ctx.call_activity(process_payment_activity, input=PaymentRequest( - request_id=order_id, item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], quantity=order_payload["quantity"])) - - try: - yield ctx.call_activity(update_inventory_activity, - input=PaymentRequest(request_id=order_id, - item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], - quantity=order_payload["quantity"])) - except Exception: - yield ctx.call_activity(notify_activity, - input=Notification(message=f'Order {order_id} Failed!')) - return OrderResult(processed=False) +The `Activities` directory holds the four workflow activities used by the workflow, defined in the following files: +- [`NotifyActivity.java`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java) +- [`RequestApprovalActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java) +- [`ReserveInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java) +- [`ProcessPaymentActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java) +- [`UpdateInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java) - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Order {order_id} has completed!')) - return OrderResult(processed=True) -``` {{% /codetab %}} - {{< /tabs >}} -## Watch the demo - -Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): - - - - ## Tell us what you think! We're continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement?