From 1286e29732d1c916dabe69fb620e92ae06f545d8 Mon Sep 17 00:00:00 2001 From: Nicholas Greenfield Date: Wed, 2 Apr 2025 11:22:35 -0700 Subject: [PATCH 1/8] First pass at cleaning up the readme --- README.md | 204 +----------------- docs/contributing.md | 13 ++ docs/development.md | 34 +++ docs/features.md | 35 +++ docs/getting-started.md | 9 + docs/supported-patterns.md | 82 +++++++ examples/dts/README.md | 5 +- .../README.md | 99 +++++++++ .../orchestrator.py | 44 ++++ .../requirements.txt | 2 + .../worker.py | 147 +++++++++++++ 11 files changed, 474 insertions(+), 200 deletions(-) create mode 100644 docs/contributing.md create mode 100644 docs/development.md create mode 100644 docs/features.md create mode 100644 docs/getting-started.md create mode 100644 docs/supported-patterns.md create mode 100644 examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md create mode 100644 examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py create mode 100644 examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt create mode 100644 examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py diff --git a/README.md b/README.md index b9d829c..06d3b59 100644 --- a/README.md +++ b/README.md @@ -4,208 +4,18 @@ [![Build Validation](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml) [![PyPI version](https://badge.fury.io/py/durabletask.svg)](https://badge.fury.io/py/durabletask) -This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) and the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. - -⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️ +This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://github.com/Azure/Durable-Task-Scheduler). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. > Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). -## Supported patterns - -The following orchestration patterns are currently supported. - -### Function chaining - -An orchestration can chain a sequence of function calls using the following syntax: - -```python -# simple activity function that returns a greeting -def hello(ctx: task.ActivityContext, name: str) -> str: - return f'Hello {name}!' - -# orchestrator function that sequences the activity calls -def sequence(ctx: task.OrchestrationContext, _): - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - return [result1, result2, result3] -``` - -You can find the full sample [here](./examples/activity_sequence.py). - -### Fan-out/fan-in - -An orchestration can fan-out a dynamic number of function calls in parallel and then fan-in the results using the following syntax: - -```python -# activity function for getting the list of work items -def get_work_items(ctx: task.ActivityContext, _) -> List[str]: - # ... - -# activity function for processing a single work item -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - # ... - -# orchestrator function that fans-out the work items and then fans-in the results -def orchestrator(ctx: task.OrchestrationContext, _): - # the number of work-items is unknown in advance - work_items = yield ctx.call_activity(get_work_items) - - # fan-out: schedule the work items in parallel and wait for all of them to complete - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results = yield task.when_all(tasks) - - # fan-in: summarize and return the results - return {'work_items': work_items, 'results': results, 'total': sum(results)} -``` - -You can find the full sample [here](./examples/fanout_fanin.py). - -### Human interaction and durable timers - -An orchestration can wait for a user-defined event, such as a human approval event, before proceding to the next step. In addition, the orchestration can create a timer with an arbitrary duration that triggers some alternate action if the external event hasn't been received: - -```python -def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): - """Orchestrator function that represents a purchase order workflow""" - # Orders under $1000 are auto-approved - if order.Cost < 1000: - return "Auto-approved" - - # Orders of $1000 or more require manager approval - yield ctx.call_activity(send_approval_request, input=order) - - # Approvals must be received within 24 hours or they will be canceled. - approval_event = ctx.wait_for_external_event("approval_received") - timeout_event = ctx.create_timer(timedelta(hours=24)) - winner = yield task.when_any([approval_event, timeout_event]) - if winner == timeout_event: - return "Canceled" - - # The order was approved - yield ctx.call_activity(place_order, input=order) - approval_details = approval_event.get_result() - return f"Approved by '{approval_details.approver}'" -``` - -As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK. - -You can find the full sample [here](./examples/human_interaction.py). - -## Feature overview - -The following features are currently supported: - -### Orchestrations - -Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input. - -### Activities - -Activities are implemented using ordinary Python functions that take an `ActivityContext` as their first parameter. Activity functions are scheduled by orchestrations and have at-least-once execution guarantees, meaning that they will be executed at least once but may be executed multiple times in the event of a transient failure. Activity functions are where the real "work" of any orchestration is done. - -### Durable timers - -Orchestrations can schedule durable timers using the `create_timer` API. These timers are durable, meaning that they will survive orchestrator restarts and will fire even if the orchestrator is not actively in memory. Durable timers can be of any duration, from milliseconds to months. - -### Sub-orchestrations - -Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. - -### External events - -Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing. - -### Continue-as-new (TODO) - -Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input. - -### Suspend, resume, and terminate - -Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events. - -### Retry policies (TODO) - -Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. - -## Getting Started - -### Prerequisites - -- Python 3.9 -- A Durable Task-compatible sidecar, like [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) - -### Installing the Durable Task Python client SDK - -Installation is currently only supported from source. Ensure pip, setuptools, and wheel are up-to-date. - -```sh -python3 -m pip install --upgrade pip setuptools wheel -``` - -To install this package from source, clone this repository and run the following command from the project root: - -```sh -python3 -m pip install . -``` - -### Run the samples - -See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them. - -## Development - -The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL. - -### Generating protobufs - -```sh -pip3 install -r dev-requirements.txt -make gen-proto -``` - -This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`. - -### Running unit tests - -Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running. - -```sh -make test-unit -``` - -### Running E2E tests - -The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command: - -```sh -go install github.com/microsoft/durabletask-go@main -durabletask-go --port 4001 -``` - -To run the E2E tests, run the following command from the project root: - -```sh -make test-e2e -``` - -## Contributing - -This project welcomes contributions and suggestions. Most contributions require you to agree to a -Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us -the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. - -When you submit a pull request, a CLA bot will automatically determine whether you need to provide -a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions -provided by the bot. You will only need to do this once across all repos using our CLA. - -This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). -For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or -contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. +# References +- [Supported Patterns](./docs/supported-patterns.md) +- [Available Features](./docs/features.md) +- [Getting Started](./docs/getting-started.md) +- [Development Guide](./docs/development.md) +- [Contributing Guide](./docs/development.md) ## Trademarks - This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow [Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general). diff --git a/docs/contributing.md b/docs/contributing.md new file mode 100644 index 0000000..6c2596b --- /dev/null +++ b/docs/contributing.md @@ -0,0 +1,13 @@ +# Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. + +When you submit a pull request, a CLA bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. \ No newline at end of file diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..5740959 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,34 @@ +# Development + +The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL. + +### Generating protobufs + +```sh +pip3 install -r dev-requirements.txt +make gen-proto +``` + +This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`. + +### Running unit tests + +Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running. + +```sh +make test-unit +``` + +### Running E2E tests + +The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command: + +```sh +docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator +``` + +To run the E2E tests, run the following command from the project root: + +```sh +make test-e2e +``` \ No newline at end of file diff --git a/docs/features.md b/docs/features.md new file mode 100644 index 0000000..5428c43 --- /dev/null +++ b/docs/features.md @@ -0,0 +1,35 @@ +# Feature overview + +The following features are currently supported: + +### Orchestrations + +Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input. + +### Activities + +Activities are implemented using ordinary Python functions that take an `ActivityContext` as their first parameter. Activity functions are scheduled by orchestrations and have at-least-once execution guarantees, meaning that they will be executed at least once but may be executed multiple times in the event of a transient failure. Activity functions are where the real "work" of any orchestration is done. + +### Durable timers + +Orchestrations can schedule durable timers using the `create_timer` API. These timers are durable, meaning that they will survive orchestrator restarts and will fire even if the orchestrator is not actively in memory. Durable timers can be of any duration, from milliseconds to months. + +### Sub-orchestrations + +Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. + +### External events + +Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing. + +### Continue-as-new (TODO) + +Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input. + +### Suspend, resume, and terminate + +Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events. + +### Retry policies (TODO) + +Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. \ No newline at end of file diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..4f31c22 --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,9 @@ +# Getting Started + +### Run the Order Processing Example +- Check out the [Durable Task Scheduler example](../examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md) + for detailed instructions on running the order processing example. + +### Explore Other Samples +- Visit the [examples](../examples/dts/) directory to find a variety of sample orchestrations and learn how to run them. + diff --git a/docs/supported-patterns.md b/docs/supported-patterns.md new file mode 100644 index 0000000..01a3aa3 --- /dev/null +++ b/docs/supported-patterns.md @@ -0,0 +1,82 @@ +# Supported patterns + +The following orchestration patterns are currently supported. + +### Function chaining + +An orchestration can chain a sequence of function calls using the following syntax: + +```python +# simple activity function that returns a greeting +def hello(ctx: task.ActivityContext, name: str) -> str: + return f'Hello {name}!' + +# orchestrator function that sequences the activity calls +def sequence(ctx: task.OrchestrationContext, _): + result1 = yield ctx.call_activity(hello, input='Tokyo') + result2 = yield ctx.call_activity(hello, input='Seattle') + result3 = yield ctx.call_activity(hello, input='London') + + return [result1, result2, result3] +``` + +You can find the full sample [here](./examples/activity_sequence.py). + +### Fan-out/fan-in + +An orchestration can fan-out a dynamic number of function calls in parallel and then fan-in the results using the following syntax: + +```python +# activity function for getting the list of work items +def get_work_items(ctx: task.ActivityContext, _) -> List[str]: + # ... + +# activity function for processing a single work item +def process_work_item(ctx: task.ActivityContext, item: str) -> int: + # ... + +# orchestrator function that fans-out the work items and then fans-in the results +def orchestrator(ctx: task.OrchestrationContext, _): + # the number of work-items is unknown in advance + work_items = yield ctx.call_activity(get_work_items) + + # fan-out: schedule the work items in parallel and wait for all of them to complete + tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] + results = yield task.when_all(tasks) + + # fan-in: summarize and return the results + return {'work_items': work_items, 'results': results, 'total': sum(results)} +``` + +You can find the full sample [here](./examples/fanout_fanin.py). + +### Human interaction and durable timers + +An orchestration can wait for a user-defined event, such as a human approval event, before proceding to the next step. In addition, the orchestration can create a timer with an arbitrary duration that triggers some alternate action if the external event hasn't been received: + +```python +def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): + """Orchestrator function that represents a purchase order workflow""" + # Orders under $1000 are auto-approved + if order.Cost < 1000: + return "Auto-approved" + + # Orders of $1000 or more require manager approval + yield ctx.call_activity(send_approval_request, input=order) + + # Approvals must be received within 24 hours or they will be canceled. + approval_event = ctx.wait_for_external_event("approval_received") + timeout_event = ctx.create_timer(timedelta(hours=24)) + winner = yield task.when_any([approval_event, timeout_event]) + if winner == timeout_event: + return "Canceled" + + # The order was approved + yield ctx.call_activity(place_order, input=order) + approval_details = approval_event.get_result() + return f"Approved by '{approval_details.approver}'" +``` + +As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK. + +You can find the full sample [here](./examples/human_interaction.py). \ No newline at end of file diff --git a/examples/dts/README.md b/examples/dts/README.md index 8df2b75..1dbf888 100644 --- a/examples/dts/README.md +++ b/examples/dts/README.md @@ -58,10 +58,10 @@ In order to use the emulator for the examples, perform the following steps: 1. Install docker if it is not already installed. 2. Pull down the docker image for the emulator: - `docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.4` + `docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6` 3. Run the emulator and wait a few seconds for the container to be ready: -`docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.4` +`docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6` 4. Set the environment variables that are referenced and used in the examples: 1. If you are using windows powershell: @@ -73,7 +73,6 @@ In order to use the emulator for the examples, perform the following steps: 5. Finally, edit the examples to change the `token_credential` input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to a value of `None` - ## Running the examples Now, you can simply execute any of the examples in this directory using `python3`: diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md b/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md new file mode 100644 index 0000000..d990457 --- /dev/null +++ b/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md @@ -0,0 +1,99 @@ +# Portable SDK Sample for Sub Orchestrations and Fan-out / Fan-in + +This sample demonstrates how to use the Durable Task SDK, also known as the Portable SDK, with the Durable Task Scheduler to create orchestrations. These orchestrations not only spin off child orchestrations but also perform parallel processing by leveraging the fan-out/fan-in application pattern. + +The scenario showcases an order processing system where orders are processed in batches. + +> Note, for simplicity, this code is contained within a single source file. In real practice, you would have + + +## Running the Examples +There are two separate ways to run an example: + +- Using the Emulator +- Using a deployed Scheduler and Taskhub + +### Running with a Deployed Scheduler and Taskhub rResource +1. To create a taskhub, follow these steps using the Azure CLI commands: + +Create a Scheduler: +```bash +az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" +``` + +Create Your Taskhub: +```bash +az durabletask taskhub create --resource-group --scheduler-name --name +``` + +2. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + +3. Set the Environment Variables: +Bash: +```bash +export TASKHUB= +export ENDPOINT= +``` +Powershell: +```powershell +$env:TASKHUB = "" +$env:ENDPOINT = "" +``` + +4. Install the Correct Packages +```bash +pip install -r requirements.txt +``` + +4. Grant your developer credentials the `Durable Task Data Contributor` Role. + +### Running with the Emulator +The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. For these steps, it is assumed that you are using port 8080. + +1. Install Docker: If it is not already installed. + +2. Pull the Docker Image for the Emulator: +```bash +docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +3. Run the Emulator: Wait a few seconds for the container to be ready. +```bash +docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.4 +``` + +3. Set the Environment Variables: +Bash: +```bash +export TASKHUB= +export ENDPOINT= +``` +Powershell: +```powershell +$env:TASKHUB = "" +$env:ENDPOINT = "" +``` + +4. Edit the Examples: Change the token_credential input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to `None`. + +### Running the Examples +You can now execute the sample using Python: + +Start the worker and ensure the TASKHUB and ENDPOINT environment variables are set in your shell: +```bash +python3 ./worker.py +``` + +Next, start the orchestrator and make sure the TASKHUB and ENDPOINT environment variables are set in your shell: +```bash +python3 ./orchestrator.py +``` + +You should start seeing logs for processing orders in both shell outputs. + +### Review Orchestration History and Status in the Durable Task Scheduler Dashboard +To access the Durable Task Scheduler Dashboard, follow these steps: + +- **Using the Emulator**: By default, the dashboard runs on portal 8082. Navigate to http://localhost:8082 and click on the default task hub. + +- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. Then, go to the Task Hub subresource that you are using and click on the dashboard URL in the top right corner. diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py b/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py new file mode 100644 index 0000000..313bd54 --- /dev/null +++ b/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py @@ -0,0 +1,44 @@ +import os +from azure.identity import DefaultAzureCredential +from durabletask import client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Read the environment variable +taskhub_name = os.getenv("TASKHUB") + +# Check if the variable exists +if taskhub_name: + print(f"The value of TASKHUB is: {taskhub_name}") +else: + print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") + print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") + print("If you are using bash, run the following: export TASKHUB=\"\"") + exit() + +# Read the environment variable +endpoint = os.getenv("ENDPOINT") + +# Check if the variable exists +if endpoint: + print(f"The value of ENDPOINT is: {endpoint}") +else: + print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") + print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") + print("If you are using bash, run the following: export ENDPOINT=\"\"") + exit() + +credential = DefaultAzureCredential() + +# Create a client, start an orchestration, and wait for it to finish +c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=credential) + +instance_id = c.schedule_new_orchestration("orchestrator") + +state = c.wait_for_orchestration_completion(instance_id, timeout=30) + +if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') +elif state: + print(f'Orchestration failed: {state.failure_details}') +exit() diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt b/examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt new file mode 100644 index 0000000..5339ebe --- /dev/null +++ b/examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt @@ -0,0 +1,2 @@ +durabletask-azuremanaged +azure-identity \ No newline at end of file diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py b/examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py new file mode 100644 index 0000000..0890e1a --- /dev/null +++ b/examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py @@ -0,0 +1,147 @@ +import os +import random +import time +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +def get_orders(ctx, _) -> list[str]: + """Activity function that returns a list of work items""" + # return a random number of work items + count = random.randint(2, 10) + print(f'generating {count} orders...') + return [f'order {i}' for i in range(count)] + +def check_and_update_inventory(ctx, order: str) -> str: + """Activity function that checks inventory for a given order""" + print(f'checking inventory for order: {order}') + + # simulate inventory check + time.sleep(random.random() * 2) + + # return a random boolean indicating if the item is in stock + return random.choices([True, False], weights=[9, 1]) + +def charge_payment(ctx, order: str) -> bool: + """Activity function that charges payment for a given order""" + print(f'charging payment for order: {order}') + + # simulate payment processing + time.sleep(random.random() * 2) + + # return a random boolean indicating if the payment was successful + return random.choices([True, False], weights=[9, 1]) + +def ship_order(ctx, order: str) -> bool: + """Activity function that ships a given order""" + print(f'shipping order: {order}') + + # simulate shipping process + time.sleep(random.random() * 2) + + # return a random boolean indicating if the shipping was successful + return random.choices([True, False], weights=[9, 1]) + +def notify_customer(ctx, order: str) -> bool: + """Activity function that notifies the customer about the order status""" + print(f'notifying customer about order: {order}') + + # simulate customer notification + time.sleep(random.random() * 2) + + # return a random boolean indicating if the notification was successful + return random.choices([True, False], weights=[9, 1]) + +def process_order(ctx, order: str) -> dict: + """Sub-orchestration function that processes a given order by performing all steps""" + print(f'processing order: {order}') + + # Check inventory + inventory_checked = yield ctx.call_activity('check_and_update_inventory', input=order) + + if not inventory_checked: + return {'order': order, 'status': 'failed', 'reason': 'out of stock'} + + # Charge payment + payment_charged = yield ctx.call_activity('charge_payment', input=order) + + if not payment_charged: + return {'order': order, 'status': 'failed', 'reason': 'payment failed'} + + # Ship order + order_shipped = yield ctx.call_activity('ship_order', input=order) + + if not order_shipped: + return {'order': order, 'status': 'failed', 'reason': 'shipping failed'} + + # Notify customer + customer_notified = yield ctx.call_activity('notify_customer', input=order) + + if not customer_notified: + return {'order': order, 'status': 'failed', 'reason': 'customer notification failed'} + + # Return success status + return {'order': order, 'status': 'completed'} + +def orchestrator(ctx, _): + """Orchestrator function that calls the 'get_orders' and 'process_order' + sub-orchestration functions in parallel, waits for them all to complete, and prints + an aggregate summary of the outputs""" + + orders: list[str] = yield ctx.call_activity('get_orders') + + # Execute the orders in parallel and wait for them all to return + tasks = [ctx.call_sub_orchestrator(process_order, input=order) for order in orders] + results: list[dict] = yield task.when_all(tasks) + + # Return an aggregate summary of the results + return { + 'orders': orders, + 'results': results, + 'total_completed': sum(1 for result in results if result['status'] == 'completed'), + 'total_failed': sum(1 for result in results if result['status'] == 'failed'), + 'details': results, + } + +# Read the environment variable +taskhub_name = os.getenv("TASKHUB") + +# Check if the variable exists +if taskhub_name: + print(f"The value of TASKHUB is: {taskhub_name}") +else: + print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") + print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") + print("If you are using bash, run the following: export TASKHUB=\"\"") + exit() + +# Read the environment variable +endpoint = os.getenv("ENDPOINT") + +# Check if the variable exists +if endpoint: + print(f"The value of ENDPOINT is: {endpoint}") +else: + print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") + print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") + print("If you are using bash, run the following: export ENDPOINT=\"\"") + exit() + +credential = DefaultAzureCredential() + +# Configure and start the worker +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=credential) as w: + + w.add_orchestrator(orchestrator) + w.add_orchestrator(process_order) + w.add_activity(get_orders) + w.add_activity(check_and_update_inventory) + w.add_activity(charge_payment) + w.add_activity(ship_order) + w.add_activity(notify_customer) + + w.start() + + while True: + time.sleep(1) From e18dc4f04f34f1d3f2156efc474621dc7cdacf93 Mon Sep 17 00:00:00 2001 From: Nicholas Greenfield Date: Wed, 2 Apr 2025 11:34:26 -0700 Subject: [PATCH 2/8] Fix README instructions to add pre-reqs --- examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md b/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md index d990457..7f0432f 100644 --- a/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md +++ b/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md @@ -6,6 +6,9 @@ The scenario showcases an order processing system where orders are processed in > Note, for simplicity, this code is contained within a single source file. In real practice, you would have +# Prerequisites + - [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) + - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) ## Running the Examples There are two separate ways to run an example: From d4784b0dae01b36655eb379c7d1ba3d3a4075459 Mon Sep 17 00:00:00 2001 From: Nicholas Greenfield Date: Wed, 2 Apr 2025 12:43:16 -0700 Subject: [PATCH 3/8] continue to clean up instructions --- docs/development.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development.md b/docs/development.md index 5740959..c2c38c0 100644 --- a/docs/development.md +++ b/docs/development.md @@ -21,7 +21,7 @@ make test-unit ### Running E2E tests -The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command: +The E2E (end-to-end) tests require a sidecar process to be running. You can use the Durable Task test sidecar using the following `docker` command: ```sh docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator From 800725fd6d065064e6793a133535d46f73af2696 Mon Sep 17 00:00:00 2001 From: Nicholas Greenfield Date: Wed, 2 Apr 2025 12:45:47 -0700 Subject: [PATCH 4/8] Restore development text --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 06d3b59..81dfd79 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://github.com/Azure/Durable-Task-Scheduler). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. +⚠️ This SDK is currently under active development and is evolving rapidly. While it's not yet ready for production use, we are excited about its potential and look forward to your feedback as we continue to improve it. ⚠️ + > Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). # References From 6dbaec1f6546e81be532f57f75a48bde353489c9 Mon Sep 17 00:00:00 2001 From: Nicholas Greenfield Date: Thu, 10 Apr 2025 09:50:56 -0700 Subject: [PATCH 5/8] Address PR feedback and change all examples to use DTS --- docs/contributing.md => CONTRIBUTING.md | 0 README.md | 4 +- docs/features.md | 4 +- docs/supported-patterns.md | 6 +- examples/README.md | 113 ++++++++++---- examples/activity_sequence.py | 89 ++++++----- examples/dts/README.md | 82 ---------- examples/dts/dts_activity_sequence.py | 71 --------- examples/dts/dts_fanout_fanin.py | 96 ------------ .../orchestrator.py | 44 ------ examples/fanout_fanin.py | 143 ++++++++++-------- examples/human_interaction.py | 119 +++++++++++---- examples/{dts => }/requirements.txt | 0 .../README.md | 58 +++---- .../orchestrator.py | 28 ++++ .../requirements.txt | 0 .../worker.py | 34 ++--- 17 files changed, 377 insertions(+), 514 deletions(-) rename docs/contributing.md => CONTRIBUTING.md (100%) delete mode 100644 examples/dts/README.md delete mode 100644 examples/dts/dts_activity_sequence.py delete mode 100644 examples/dts/dts_fanout_fanin.py delete mode 100644 examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py rename examples/{dts => }/requirements.txt (100%) rename examples/{dts => }/sub-orchestrations-with-fan-out-fan-in/README.md (67%) create mode 100644 examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py rename examples/{dts => }/sub-orchestrations-with-fan-out-fan-in/requirements.txt (100%) rename examples/{dts => }/sub-orchestrations-with-fan-out-fan-in/worker.py (80%) diff --git a/docs/contributing.md b/CONTRIBUTING.md similarity index 100% rename from docs/contributing.md rename to CONTRIBUTING.md diff --git a/README.md b/README.md index 81dfd79..0465d8d 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,14 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler]( ⚠️ This SDK is currently under active development and is evolving rapidly. While it's not yet ready for production use, we are excited about its potential and look forward to your feedback as we continue to improve it. ⚠️ -> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). +> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). # References - [Supported Patterns](./docs/supported-patterns.md) - [Available Features](./docs/features.md) - [Getting Started](./docs/getting-started.md) - [Development Guide](./docs/development.md) -- [Contributing Guide](./docs/development.md) +- [Contributing Guide](./CONTRIBUTING.md) ## Trademarks This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft diff --git a/docs/features.md b/docs/features.md index 5428c43..d5c1b8c 100644 --- a/docs/features.md +++ b/docs/features.md @@ -22,7 +22,7 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator` Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing. -### Continue-as-new (TODO) +### Continue-as-new Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input. @@ -30,6 +30,6 @@ Orchestrations can be continued as new using the `continue_as_new` API. This API Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events. -### Retry policies (TODO) +### Retry policies Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. \ No newline at end of file diff --git a/docs/supported-patterns.md b/docs/supported-patterns.md index 01a3aa3..bbac4a7 100644 --- a/docs/supported-patterns.md +++ b/docs/supported-patterns.md @@ -20,7 +20,7 @@ def sequence(ctx: task.OrchestrationContext, _): return [result1, result2, result3] ``` -You can find the full sample [here](./examples/activity_sequence.py). +You can find the full sample [here](../examples/activity_sequence.py). ### Fan-out/fan-in @@ -48,7 +48,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): return {'work_items': work_items, 'results': results, 'total': sum(results)} ``` -You can find the full sample [here](./examples/fanout_fanin.py). +You can find the full sample [here](../examples/fanout_fanin.py). ### Human interaction and durable timers @@ -79,4 +79,4 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK. -You can find the full sample [here](./examples/human_interaction.py). \ No newline at end of file +You can find the full sample [here](../examples/human_interaction.py). \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 404b127..0912a60 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,27 +1,86 @@ -# Examples - -This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK. - -## Prerequisites - -All the examples assume that you have a Durable Task-compatible sidecar running locally. There are two options for this: - -1. Install the latest version of the [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/), which contains and exposes an embedded version of the Durable Task engine. The setup process (which requires Docker) will configure the workflow engine to store state in a local Redis container. - -2. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database. - -## Running the examples - -With one of the sidecars running, you can simply execute any of the examples in this directory using `python3`: - -```sh -python3 ./activity_sequence.py -``` - -In some cases, the sample may require command-line parameters or user inputs. In these cases, the sample will print out instructions on how to proceed. - -## List of examples - -- [Activity sequence](./activity_sequence.py): Orchestration that schedules three activity calls in a sequence. -- [Fan-out/fan-in](./fanout_fanin.py): Orchestration that schedules a dynamic number of activity calls in parallel, waits for all of them to complete, and then performs an aggregation on the results. -- [Human interaction](./human_interaction.py): Orchestration that waits for a human to approve an order before continuing. +# Examples + +This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). + +## Prerequisites +If using a deployed Durable Task Scheduler: + - [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) + - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) + +## Running the Examples +There are two separate ways to run an example: + +- Using the Emulator (recommended for learning and development) +- Using a deployed Scheduler and Taskhub in Azure + +### Running with the Emulator +We recommend using the emulator for learning and development as it's faster to set up and doesn't require any Azure resources. The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. + +1. Install Docker: If it is not already installed. + +2. Pull the Docker Image for the Emulator: +```bash +docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +3. Run the Emulator: Wait a few seconds for the container to be ready. +```bash +docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +4. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +Note: The example code has been updated to use the default emulator settings automatically (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Running with a Deployed Scheduler and Taskhub Resource in Azure +For production scenarios or when you're ready to deploy to Azure, you can create a taskhub using the Azure CLI: + +1. Create a Scheduler: +```bash +az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" +``` + +2. Create Your Taskhub: +```bash +az durabletask taskhub create --resource-group --scheduler-name --name +``` + +3. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + +4. Set the Environment Variables: +Bash: +```bash +export TASKHUB= +export ENDPOINT= +``` +Powershell: +```powershell +$env:TASKHUB = "" +$env:ENDPOINT = "" +``` + +5. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +### Running the Examples +You can now execute any of the examples in this directory using Python: + +```bash +python3 example_file.py +``` + +### Review Orchestration History and Status in the Durable Task Scheduler Dashboard +To access the Durable Task Scheduler Dashboard, follow these steps: + +- **Using the Emulator**: By default, the dashboard runs on portal 8082. Navigate to http://localhost:8082 and click on the default task hub. + +- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. Then, go to the Task Hub subresource that you are using and click on the dashboard URL in the top right corner. + +```sh +python3 activity_sequence.py +``` diff --git a/examples/activity_sequence.py b/examples/activity_sequence.py index 066a733..38c013d 100644 --- a/examples/activity_sequence.py +++ b/examples/activity_sequence.py @@ -1,35 +1,54 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that calls an activity function in a sequence and prints the outputs.""" -from durabletask import client, task, worker - - -def hello(ctx: task.ActivityContext, name: str) -> str: - """Activity function that returns a greeting""" - return f'Hello {name}!' - - -def sequence(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'hello' activity function in a sequence""" - # call "hello" activity function in a sequence - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - # return an array of results - return [result1, result2, result3] - - -# configure and start the worker -with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(sequence) - w.add_activity(hello) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = client.TaskHubGrpcClient() - instance_id = c.schedule_new_orchestration(sequence) - state = c.wait_for_orchestration_completion(instance_id, timeout=10) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') +"""End-to-end sample that demonstrates how to configure an orchestrator +that calls an activity function in a sequence and prints the outputs.""" +import os + +from azure.identity import DefaultAzureCredential + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + + +def hello(ctx: task.ActivityContext, name: str) -> str: + """Activity function that returns a greeting""" + return f'Hello {name}!' + + +def sequence(ctx: task.OrchestrationContext, _): + """Orchestrator function that calls the 'hello' activity function in a sequence""" + # call "hello" activity function in a sequence + result1 = yield ctx.call_activity(hello, input='Tokyo') + result2 = yield ctx.call_activity(hello, input='Seattle') + result3 = yield ctx.call_activity(hello, input='London') + + # return an array of results + return [result1, result2, result3] + + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# configure and start the worker - use secure_channel=False for emulator +secure_channel = endpoint != "http://localhost:8080" +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(sequence) + w.add_activity(hello) + w.start() + + # Construct the client and run the orchestrations + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + instance_id = c.schedule_new_orchestration(sequence) + state = c.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + elif state: + print(f'Orchestration failed: {state.failure_details}') diff --git a/examples/dts/README.md b/examples/dts/README.md deleted file mode 100644 index 1dbf888..0000000 --- a/examples/dts/README.md +++ /dev/null @@ -1,82 +0,0 @@ -# Examples - -This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). Please note that the installation instructions provided below will use the version of DTS directly from the your branch rather than installing through PyPI. - -## Prerequisites - -There are 2 separate ways to run an example: -1. Using the emulator. -2. Using a real scheduler and taskhub. - -All the examples by defualt assume that you have a Durable Task Scheduler taskhub created. - -## Running with a scheduler and taskhub resource -The simplest way to create a taskhub is by using the az cli commands: - -1. Create a scheduler: - az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{}" - -1. Create your taskhub - - ```bash - az durabletask taskhub create --resource-group --scheduler-name --name - ``` - -1. Retrieve the endpoint for the scheduler. This can be done by locating the taskhub in the portal. - -1. Set the appropriate environment variables for the TASKHUB and ENDPOINT - - ```bash - export TASKHUB= - export ENDPOINT= - ``` - -1. Since the samples rely on azure identity, ensure the package is installed and up-to-date - - ```bash - python3 -m pip install azure-identity - ``` - -1. Install the correct packages from the top level of this repository, i.e. durabletask-python/ - - ```bash - python3 -m pip install . - ``` - -1. Install the DTS specific packages from the durabletask-python/durabletask-azuremanaged directory - - ```bash - pip3 install -e . - ``` - -1. Grant yourself the `Durable Task Data Contributor` role over your scheduler - -## Running with the emulator -The emulator is a simulation of a scheduler and taskhub. It is the 'backend' of the durabletask-azuremanaged system packaged up into an easy to use docker container. For these steps, it is assumed that you are using port 8080. - -In order to use the emulator for the examples, perform the following steps: -1. Install docker if it is not already installed. - -2. Pull down the docker image for the emulator: - `docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6` - -3. Run the emulator and wait a few seconds for the container to be ready: -`docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6` - -4. Set the environment variables that are referenced and used in the examples: - 1. If you are using windows powershell: - `$env:TASKHUB="default"` - `$env:ENDPOINT="http://localhost:8080"` - 2. If you are using bash: - `export TASKHUB=default` - `export ENDPOINT=http://localhost:8080` - -5. Finally, edit the examples to change the `token_credential` input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to a value of `None` - -## Running the examples - -Now, you can simply execute any of the examples in this directory using `python3`: - -```sh -python3 dts_activity_sequence.py -``` diff --git a/examples/dts/dts_activity_sequence.py b/examples/dts/dts_activity_sequence.py deleted file mode 100644 index 2ff3c22..0000000 --- a/examples/dts/dts_activity_sequence.py +++ /dev/null @@ -1,71 +0,0 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that calls an activity function in a sequence and prints the outputs.""" -import os - -from azure.identity import DefaultAzureCredential - -from durabletask import client, task -from durabletask.azuremanaged.client import DurableTaskSchedulerClient -from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker - - -def hello(ctx: task.ActivityContext, name: str) -> str: - """Activity function that returns a greeting""" - return f'Hello {name}!' - - -def sequence(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'hello' activity function in a sequence""" - # call "hello" activity function in a sequence - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - # return an array of results - return [result1, result2, result3] - - -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -# Note that any azure-identity credential type and configuration can be used here as DTS supports various credential -# types such as Managed Identities -credential = DefaultAzureCredential() - -# configure and start the worker -with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) as w: - w.add_orchestrator(sequence) - w.add_activity(hello) - w.start() - - # Construct the client and run the orchestrations - c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) - instance_id = c.schedule_new_orchestration(sequence) - state = c.wait_for_orchestration_completion(instance_id, timeout=60) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') diff --git a/examples/dts/dts_fanout_fanin.py b/examples/dts/dts_fanout_fanin.py deleted file mode 100644 index 8ab68df..0000000 --- a/examples/dts/dts_fanout_fanin.py +++ /dev/null @@ -1,96 +0,0 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that a dynamic number activity functions in parallel, waits for them all -to complete, and prints an aggregate summary of the outputs.""" -import os -import random -import time - -from azure.identity import DefaultAzureCredential - -from durabletask import client, task -from durabletask.azuremanaged.client import DurableTaskSchedulerClient -from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker - - -def get_work_items(ctx: task.ActivityContext, _) -> list[str]: - """Activity function that returns a list of work items""" - # return a random number of work items - count = random.randint(2, 10) - print(f'generating {count} work items...') - return [f'work item {i}' for i in range(count)] - - -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - """Activity function that returns a result for a given work item""" - print(f'processing work item: {item}') - - # simulate some work that takes a variable amount of time - time.sleep(random.random() * 5) - - # return a result for the given work item, which is also a random number in this case - return random.randint(0, 10) - - -def orchestrator(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'get_work_items' and 'process_work_item' - activity functions in parallel, waits for them all to complete, and prints - an aggregate summary of the outputs""" - - work_items: list[str] = yield ctx.call_activity(get_work_items) - - # execute the work-items in parallel and wait for them all to return - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results: list[int] = yield task.when_all(tasks) - - # return an aggregate summary of the results - return { - 'work_items': work_items, - 'results': results, - 'total': sum(results), - } - - -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -credential = DefaultAzureCredential() - -# configure and start the worker -with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) as w: - w.add_orchestrator(orchestrator) - w.add_activity(process_work_item) - w.add_activity(get_work_items) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) - instance_id = c.schedule_new_orchestration(orchestrator) - state = c.wait_for_orchestration_completion(instance_id, timeout=30) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') - exit() diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py b/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py deleted file mode 100644 index 313bd54..0000000 --- a/examples/dts/sub-orchestrations-with-fan-out-fan-in/orchestrator.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -from azure.identity import DefaultAzureCredential -from durabletask import client -from durabletask.azuremanaged.client import DurableTaskSchedulerClient - -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -credential = DefaultAzureCredential() - -# Create a client, start an orchestration, and wait for it to finish -c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) - -instance_id = c.schedule_new_orchestration("orchestrator") - -state = c.wait_for_orchestration_completion(instance_id, timeout=30) - -if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') -elif state: - print(f'Orchestration failed: {state.failure_details}') -exit() diff --git a/examples/fanout_fanin.py b/examples/fanout_fanin.py index c53744f..a606731 100644 --- a/examples/fanout_fanin.py +++ b/examples/fanout_fanin.py @@ -1,62 +1,81 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that a dynamic number activity functions in parallel, waits for them all -to complete, and prints an aggregate summary of the outputs.""" -import random -import time - -from durabletask import client, task, worker - - -def get_work_items(ctx: task.ActivityContext, _) -> list[str]: - """Activity function that returns a list of work items""" - # return a random number of work items - count = random.randint(2, 10) - print(f'generating {count} work items...') - return [f'work item {i}' for i in range(count)] - - -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - """Activity function that returns a result for a given work item""" - print(f'processing work item: {item}') - - # simulate some work that takes a variable amount of time - time.sleep(random.random() * 5) - - # return a result for the given work item, which is also a random number in this case - return random.randint(0, 10) - - -def orchestrator(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'get_work_items' and 'process_work_item' - activity functions in parallel, waits for them all to complete, and prints - an aggregate summary of the outputs""" - - work_items: list[str] = yield ctx.call_activity(get_work_items) - - # execute the work-items in parallel and wait for them all to return - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results: list[int] = yield task.when_all(tasks) - - # return an aggregate summary of the results - return { - 'work_items': work_items, - 'results': results, - 'total': sum(results), - } - - -# configure and start the worker -with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(orchestrator) - w.add_activity(process_work_item) - w.add_activity(get_work_items) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = client.TaskHubGrpcClient() - instance_id = c.schedule_new_orchestration(orchestrator) - state = c.wait_for_orchestration_completion(instance_id, timeout=30) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') +"""End-to-end sample that demonstrates how to configure an orchestrator +that a dynamic number activity functions in parallel, waits for them all +to complete, and prints an aggregate summary of the outputs.""" +import os +import random +import time + +from azure.identity import DefaultAzureCredential + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + + +def get_work_items(ctx: task.ActivityContext, _) -> list[str]: + """Activity function that returns a list of work items""" + # return a random number of work items + count = random.randint(2, 10) + print(f'generating {count} work items...') + return [f'work item {i}' for i in range(count)] + + +def process_work_item(ctx: task.ActivityContext, item: str) -> int: + """Activity function that returns a result for a given work item""" + print(f'processing work item: {item}') + + # simulate some work that takes a variable amount of time + time.sleep(random.random() * 5) + + # return a result for the given work item, which is also a random number in this case + return random.randint(0, 10) + + +def orchestrator(ctx: task.OrchestrationContext, _): + """Orchestrator function that calls the 'get_work_items' and 'process_work_item' + activity functions in parallel, waits for them all to complete, and prints + an aggregate summary of the outputs""" + + work_items: list[str] = yield ctx.call_activity(get_work_items) + + # execute the work-items in parallel and wait for them all to return + tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] + results: list[int] = yield task.when_all(tasks) + + # return an aggregate summary of the results + return { + 'work_items': work_items, + 'results': results, + 'total': sum(results), + } + + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# configure and start the worker - use secure_channel=False for emulator +secure_channel = endpoint != "http://localhost:8080" +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(orchestrator) + w.add_activity(process_work_item) + w.add_activity(get_work_items) + w.start() + + # create a client, start an orchestration, and wait for it to finish + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + instance_id = c.schedule_new_orchestration(orchestrator) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + elif state: + print(f'Orchestration failed: {state.failure_details}') + exit() diff --git a/examples/human_interaction.py b/examples/human_interaction.py index 2a01897..143a2ff 100644 --- a/examples/human_interaction.py +++ b/examples/human_interaction.py @@ -3,13 +3,18 @@ the approval isn't received within a specified timeout, the order that is represented by the orchestration is automatically cancelled.""" +import os import threading import time from collections import namedtuple from dataclasses import dataclass from datetime import timedelta +from azure.identity import DefaultAzureCredential + from durabletask import client, task, worker +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker @dataclass @@ -63,37 +68,87 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): parser.add_argument("--cost", type=int, default=2000, help="Cost of the order") parser.add_argument("--approver", type=str, default="Me", help="Approver name") parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds") + parser.add_argument("--local", action="store_true", help="Use local worker instead of DurableTaskScheduler") args = parser.parse_args() - # configure and start the worker - with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(purchase_order_workflow) - w.add_activity(send_approval_request) - w.add_activity(place_order) - w.start() - - c = client.TaskHubGrpcClient() - - # Start a purchase order workflow using the user input - order = Order(args.cost, "MyProduct", 1) - instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) - - def prompt_for_approval(): - input("Press [ENTER] to approve the order...\n") - approval_event = namedtuple("Approval", ["approver"])(args.approver) - c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) - - # Prompt the user for approval on a background thread - threading.Thread(target=prompt_for_approval, daemon=True).start() - - # Wait for the orchestration to complete - try: - state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) - if not state: - print("Workflow not found!") # not expected - elif state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - else: - state.raise_if_failed() # raises an exception - except TimeoutError: - print("*** Orchestration timed out!") + if args.local: + # Use local worker (original implementation) + with worker.TaskHubGrpcWorker() as w: + w.add_orchestrator(purchase_order_workflow) + w.add_activity(send_approval_request) + w.add_activity(place_order) + w.start() + + c = client.TaskHubGrpcClient() + + # Start a purchase order workflow using the user input + order = Order(args.cost, "MyProduct", 1) + instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) + + def prompt_for_approval(): + input("Press [ENTER] to approve the order...\n") + approval_event = namedtuple("Approval", ["approver"])(args.approver) + c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) + + # Prompt the user for approval on a background thread + threading.Thread(target=prompt_for_approval, daemon=True).start() + + # Wait for the orchestration to complete + try: + state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + else: + state.raise_if_failed() # raises an exception + except TimeoutError: + print("*** Orchestration timed out!") + else: + # Use DurableTaskScheduler + # Use environment variables if provided, otherwise use default emulator values + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + # Configure and start the worker - use secure_channel=False for emulator + secure_channel = endpoint != "http://localhost:8080" + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(purchase_order_workflow) + w.add_activity(send_approval_request) + w.add_activity(place_order) + w.start() + + # Construct the client and run the orchestrations + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + + # Start a purchase order workflow using the user input + order = Order(args.cost, "MyProduct", 1) + instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) + + def prompt_for_approval(): + input("Press [ENTER] to approve the order...\n") + approval_event = namedtuple("Approval", ["approver"])(args.approver) + c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) + + # Prompt the user for approval on a background thread + threading.Thread(target=prompt_for_approval, daemon=True).start() + + # Wait for the orchestration to complete + try: + state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + else: + state.raise_if_failed() # raises an exception + except TimeoutError: + print("*** Orchestration timed out!") diff --git a/examples/dts/requirements.txt b/examples/requirements.txt similarity index 100% rename from examples/dts/requirements.txt rename to examples/requirements.txt diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md b/examples/sub-orchestrations-with-fan-out-fan-in/README.md similarity index 67% rename from examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md rename to examples/sub-orchestrations-with-fan-out-fan-in/README.md index 7f0432f..8e73e78 100644 --- a/examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md +++ b/examples/sub-orchestrations-with-fan-out-fan-in/README.md @@ -7,65 +7,54 @@ The scenario showcases an order processing system where orders are processed in > Note, for simplicity, this code is contained within a single source file. In real practice, you would have # Prerequisites +If using a deployed Durable Task Scheduler: - [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) ## Running the Examples There are two separate ways to run an example: -- Using the Emulator -- Using a deployed Scheduler and Taskhub +- Using the Emulator (recommended for learning and development) +- Using a deployed Scheduler and Taskhub in Azure -### Running with a Deployed Scheduler and Taskhub rResource -1. To create a taskhub, follow these steps using the Azure CLI commands: +### Running with the Emulator +We recommend using the emulator for learning and development as it's faster to set up and doesn't require any Azure resources. The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. -Create a Scheduler: -```bash -az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" -``` +1. Install Docker: If it is not already installed. -Create Your Taskhub: +2. Pull the Docker Image for the Emulator: ```bash -az durabletask taskhub create --resource-group --scheduler-name --name +docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 ``` -2. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. - -3. Set the Environment Variables: -Bash: +3. Run the Emulator: Wait a few seconds for the container to be ready. ```bash -export TASKHUB= -export ENDPOINT= -``` -Powershell: -```powershell -$env:TASKHUB = "" -$env:ENDPOINT = "" +docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 ``` -4. Install the Correct Packages +4. Install the Required Packages: ```bash pip install -r requirements.txt ``` -4. Grant your developer credentials the `Durable Task Data Contributor` Role. +Note: The example code has been updated to use the default emulator settings automatically (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. -### Running with the Emulator -The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. For these steps, it is assumed that you are using port 8080. +### Running with a Deployed Scheduler and Taskhub Resource in Azure +For production scenarios or when you're ready to deploy to Azure, you can create a taskhub using the Azure CLI: -1. Install Docker: If it is not already installed. - -2. Pull the Docker Image for the Emulator: +1. Create a Scheduler: ```bash -docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 +az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" ``` -3. Run the Emulator: Wait a few seconds for the container to be ready. +2. Create Your Taskhub: ```bash -docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.4 +az durabletask taskhub create --resource-group --scheduler-name --name ``` -3. Set the Environment Variables: +3. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + +4. Set the Environment Variables: Bash: ```bash export TASKHUB= @@ -77,7 +66,10 @@ $env:TASKHUB = "" $env:ENDPOINT = "" ``` -4. Edit the Examples: Change the token_credential input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to `None`. +5. Install the Required Packages: +```bash +pip install -r requirements.txt +``` ### Running the Examples You can now execute the sample using Python: diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py new file mode 100644 index 0000000..a5e013b --- /dev/null +++ b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py @@ -0,0 +1,28 @@ +import os +from azure.identity import DefaultAzureCredential +from durabletask import client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# Create a client, start an orchestration, and wait for it to finish +c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=credential) + +instance_id = c.schedule_new_orchestration("orchestrator") + +state = c.wait_for_orchestration_completion(instance_id, timeout=30) + +if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') +elif state: + print(f'Orchestration failed: {state.failure_details}') +exit() diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt b/examples/sub-orchestrations-with-fan-out-fan-in/requirements.txt similarity index 100% rename from examples/dts/sub-orchestrations-with-fan-out-fan-in/requirements.txt rename to examples/sub-orchestrations-with-fan-out-fan-in/requirements.txt diff --git a/examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py similarity index 80% rename from examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py rename to examples/sub-orchestrations-with-fan-out-fan-in/worker.py index 0890e1a..66c8d71 100644 --- a/examples/dts/sub-orchestrations-with-fan-out-fan-in/worker.py +++ b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py @@ -103,31 +103,15 @@ def orchestrator(ctx, _): 'details': results, } -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -credential = DefaultAzureCredential() +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() # Configure and start the worker with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, From 41e070478444b7e63d7883b810bc118ba7d1d811 Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 18 Aug 2025 10:30:55 -0700 Subject: [PATCH 6/8] Fix CI --- .github/workflows/durabletask-azuremanaged.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/durabletask-azuremanaged.yml b/.github/workflows/durabletask-azuremanaged.yml index e2215a3..9de61e3 100644 --- a/.github/workflows/durabletask-azuremanaged.yml +++ b/.github/workflows/durabletask-azuremanaged.yml @@ -67,7 +67,7 @@ jobs: pip install -r requirements.txt - name: Install durabletask-azuremanaged dependencies - working-directory: examples/dts + working-directory: examples run: | python -m pip install --upgrade pip pip install -r requirements.txt From 56319fdedc8295f0023e098652264149d0f25a98 Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 18 Aug 2025 10:40:04 -0700 Subject: [PATCH 7/8] Fix linting issues --- README.md | 2 -- examples/human_interaction.py | 6 +++--- .../sub-orchestrations-with-fan-out-fan-in/worker.py | 12 ++++++++++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0465d8d..49d6e0d 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,6 @@ This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://github.com/Azure/Durable-Task-Scheduler). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. -⚠️ This SDK is currently under active development and is evolving rapidly. While it's not yet ready for production use, we are excited about its potential and look forward to your feedback as we continue to improve it. ⚠️ - > Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). # References diff --git a/examples/human_interaction.py b/examples/human_interaction.py index 143a2ff..ae93cd2 100644 --- a/examples/human_interaction.py +++ b/examples/human_interaction.py @@ -119,7 +119,7 @@ def prompt_for_approval(): # Configure and start the worker - use secure_channel=False for emulator secure_channel = endpoint != "http://localhost:8080" with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, - taskhub=taskhub_name, token_credential=credential) as w: + taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(purchase_order_workflow) w.add_activity(send_approval_request) w.add_activity(place_order) @@ -127,12 +127,12 @@ def prompt_for_approval(): # Construct the client and run the orchestrations c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, - taskhub=taskhub_name, token_credential=credential) + taskhub=taskhub_name, token_credential=credential) # Start a purchase order workflow using the user input order = Order(args.cost, "MyProduct", 1) instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) - + def prompt_for_approval(): input("Press [ENTER] to approve the order...\n") approval_event = namedtuple("Approval", ["approver"])(args.approver) diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/worker.py b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py index 66c8d71..8ca447d 100644 --- a/examples/sub-orchestrations-with-fan-out-fan-in/worker.py +++ b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py @@ -5,6 +5,7 @@ from durabletask import task from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + def get_orders(ctx, _) -> list[str]: """Activity function that returns a list of work items""" # return a random number of work items @@ -12,6 +13,7 @@ def get_orders(ctx, _) -> list[str]: print(f'generating {count} orders...') return [f'order {i}' for i in range(count)] + def check_and_update_inventory(ctx, order: str) -> str: """Activity function that checks inventory for a given order""" print(f'checking inventory for order: {order}') @@ -22,6 +24,7 @@ def check_and_update_inventory(ctx, order: str) -> str: # return a random boolean indicating if the item is in stock return random.choices([True, False], weights=[9, 1]) + def charge_payment(ctx, order: str) -> bool: """Activity function that charges payment for a given order""" print(f'charging payment for order: {order}') @@ -32,6 +35,7 @@ def charge_payment(ctx, order: str) -> bool: # return a random boolean indicating if the payment was successful return random.choices([True, False], weights=[9, 1]) + def ship_order(ctx, order: str) -> bool: """Activity function that ships a given order""" print(f'shipping order: {order}') @@ -42,6 +46,7 @@ def ship_order(ctx, order: str) -> bool: # return a random boolean indicating if the shipping was successful return random.choices([True, False], weights=[9, 1]) + def notify_customer(ctx, order: str) -> bool: """Activity function that notifies the customer about the order status""" print(f'notifying customer about order: {order}') @@ -52,11 +57,12 @@ def notify_customer(ctx, order: str) -> bool: # return a random boolean indicating if the notification was successful return random.choices([True, False], weights=[9, 1]) + def process_order(ctx, order: str) -> dict: """Sub-orchestration function that processes a given order by performing all steps""" print(f'processing order: {order}') - # Check inventory + # Check inventory inventory_checked = yield ctx.call_activity('check_and_update_inventory', input=order) if not inventory_checked: @@ -83,6 +89,7 @@ def process_order(ctx, order: str) -> dict: # Return success status return {'order': order, 'status': 'completed'} + def orchestrator(ctx, _): """Orchestrator function that calls the 'get_orders' and 'process_order' sub-orchestration functions in parallel, waits for them all to complete, and prints @@ -103,6 +110,7 @@ def orchestrator(ctx, _): 'details': results, } + # Use environment variables if provided, otherwise use default emulator values taskhub_name = os.getenv("TASKHUB", "default") endpoint = os.getenv("ENDPOINT", "http://localhost:8080") @@ -116,7 +124,7 @@ def orchestrator(ctx, _): # Configure and start the worker with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=credential) as w: - + w.add_orchestrator(orchestrator) w.add_orchestrator(process_order) w.add_activity(get_orders) From 0d7666f204428c368da09a46bbc26c943cef434a Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Mon, 18 Aug 2025 11:15:58 -0700 Subject: [PATCH 8/8] Update docs/development.md --- docs/development.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/development.md b/docs/development.md index c2c38c0..3308316 100644 --- a/docs/development.md +++ b/docs/development.md @@ -24,7 +24,8 @@ make test-unit The E2E (end-to-end) tests require a sidecar process to be running. You can use the Durable Task test sidecar using the following `docker` command: ```sh -docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator +go install github.com/microsoft/durabletask-go@main +durabletask-go --port 4001 ``` To run the E2E tests, run the following command from the project root: