diff --git a/README.md b/README.md index d9ae57a30..5a39e834b 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. +* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers. ## Test diff --git a/pyproject.toml b/pyproject.toml index 2bc7f73ac..794bcb925 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,15 +27,8 @@ dev = [ "poethepoet>=0.36.0", ] bedrock = ["boto3>=1.34.92,<2"] -dsl = [ - "pyyaml>=6.0.1,<7", - "types-pyyaml>=6.0.12,<7", - "dacite>=1.8.1,<2", -] -encryption = [ - "cryptography>=38.0.1,<39", - "aiohttp>=3.8.1,<4", -] +dsl = ["pyyaml>=6.0.1,<7", "types-pyyaml>=6.0.12,<7", "dacite>=1.8.1,<2"] +encryption = ["cryptography>=38.0.1,<39", "aiohttp>=3.8.1,<4"] gevent = ["gevent>=25.4.2 ; python_version >= '3.8'"] langchain = [ "langchain>=0.1.7,<0.2 ; python_version >= '3.8.1' and python_version < '4.0'", @@ -46,9 +39,7 @@ langchain = [ "tqdm>=4.62.0,<5", "uvicorn[standard]>=0.24.0.post1,<0.25", ] -nexus = [ - "nexus-rpc>=1.1.0,<2", -] +nexus = ["nexus-rpc>=1.1.0,<2"] open-telemetry = [ "temporalio[opentelemetry]", "opentelemetry-exporter-otlp-proto-grpc", @@ -60,10 +51,7 @@ openai-agents = [ ] pydantic-converter = ["pydantic>=2.10.6,<3"] sentry = ["sentry-sdk>=2.13.0"] -trio-async = [ - "trio>=0.28.0,<0.29", - "trio-asyncio>=0.15.0,<0.16", -] +trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"] cloud-export-to-parquet = [ "pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'", "numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'", @@ -71,6 +59,8 @@ cloud-export-to-parquet = [ "pyarrow>=19.0.1", ] +[tool.hatch.metadata] +allow-direct-references = true [tool.hatch.build.targets.sdist] include = ["./**/*.py"] @@ -118,8 +108,15 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.poe.tasks] -format = [{cmd = "uv run ruff check --select I --fix"}, {cmd = "uv run ruff format"}] -lint = [{cmd = "uv run ruff check --select I"}, {cmd = "uv run ruff format --check"}, {ref = "lint-types"}] +format = [ + { cmd = "uv run ruff check --select I --fix" }, + { cmd = "uv run ruff format" }, +] +lint = [ + { cmd = "uv run ruff check --select I" }, + { cmd = "uv run ruff format --check" }, + { ref = "lint-types" }, +] lint-types = "uv run --all-groups mypy --check-untyped-defs --namespace-packages ." test = "uv run --all-groups pytest" diff --git a/sleep_for_days/README.md b/sleep_for_days/README.md index 69302cc79..5117fcdb2 100644 --- a/sleep_for_days/README.md +++ b/sleep_for_days/README.md @@ -2,7 +2,7 @@ This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days. -To run, first see the main [README.md](../../README.md) for prerequisites. +To run, first see the main [README.md](../README.md) for prerequisites. Then create two terminals. diff --git a/worker_multiprocessing/README.md b/worker_multiprocessing/README.md new file mode 100644 index 000000000..a4f06f86b --- /dev/null +++ b/worker_multiprocessing/README.md @@ -0,0 +1,93 @@ +# Worker Multiprocessing Sample + + +## Python Concurrency Limitations + +CPU-bound tasks effectively cannot run in parallel in Python due to the [Global Interpreter Lock (GIL)](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The Python standard library's [`threading` module](https://docs.python.org/3/library/threading.html) provides the following guidance: + +> CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously. + +## Temporal Workflow Tasks in Python + +[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU-bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html), as suggested by the `threading` documentation, to more appropriately utilize machine resources. + +This sample demonstrates how to use `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes. + +## Running the Sample + +To run, first see the root [README.md](../README.md) for prerequisites. Then execute the following commands from the root directory: + +``` +uv run worker_multiprocessing/worker.py +uv run worker_multiprocessing/starter.py +``` + +Both `worker.py` and `starter.py` have minimal arguments that can be adjusted to modify how the sample runs. + +``` +uv run worker_multiprocessing/worker.py -h + +usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS] + +options: + -h, --help show this help message and exit + -w, --num-workflow-workers NUM_WORKFLOW_WORKERS + -a, --num-activity-workers NUM_ACTIVITY_WORKERS +``` + +``` +uv run worker_multiprocessing/starter.py -h + +usage: starter.py [-h] [-n NUM_WORKFLOWS] + +options: + -h, --help show this help message and exit + -n, --num-workflows NUM_WORKFLOWS + the number of workflows to execute +``` + +## Example Output + +``` +uv run worker_multiprocessing/worker.py + +starting 2 workflow worker(s) and 1 activity worker(s) +waiting for keyboard interrupt or for all workers to exit +workflow-worker:0 starting +workflow-worker:1 starting +activity-worker:0 starting +workflow-worker:0 shutting down +activity-worker:0 shutting down +workflow-worker:1 shutting down +``` + + +``` +uv run worker_multiprocessing/starter.py + +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +``` diff --git a/worker_multiprocessing/__init__.py b/worker_multiprocessing/__init__.py new file mode 100644 index 000000000..d9e30b8a7 --- /dev/null +++ b/worker_multiprocessing/__init__.py @@ -0,0 +1,2 @@ +WORKFLOW_TASK_QUEUE = "workflow-task-queue" +ACTIVITY_TASK_QUEUE = "activity-task-queue" diff --git a/worker_multiprocessing/activities.py b/worker_multiprocessing/activities.py new file mode 100644 index 000000000..dbbbc6778 --- /dev/null +++ b/worker_multiprocessing/activities.py @@ -0,0 +1,8 @@ +import os + +from temporalio import activity + + +@activity.defn +async def echo_pid_activity(input: str) -> str: + return f"{input} | activity-pid:{os.getpid()}" diff --git a/worker_multiprocessing/starter.py b/worker_multiprocessing/starter.py new file mode 100644 index 000000000..3267694de --- /dev/null +++ b/worker_multiprocessing/starter.py @@ -0,0 +1,48 @@ +import argparse +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from worker_multiprocessing import WORKFLOW_TASK_QUEUE +from worker_multiprocessing.workflows import ParallelizedWorkflow + + +class Args(argparse.Namespace): + num_workflows: int + + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", + "--num-workflows", + help="the number of workflows to execute", + type=int, + default=25, + ) + args = parser.parse_args(namespace=Args()) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + # Start several workflows + wf_handles = [ + client.execute_workflow( + ParallelizedWorkflow.run, + id=f"greeting-workflow-id-{uuid.uuid4()}", + task_queue=WORKFLOW_TASK_QUEUE, + ) + for _ in range(args.num_workflows) + ] + + # Wait for workflow completion + for wf in asyncio.as_completed(wf_handles): + result = await wf + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_multiprocessing/worker.py b/worker_multiprocessing/worker.py new file mode 100644 index 000000000..bda69ba63 --- /dev/null +++ b/worker_multiprocessing/worker.py @@ -0,0 +1,146 @@ +import argparse +import asyncio +import concurrent.futures +import dataclasses +import multiprocessing +import traceback +from typing import Literal + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.runtime import Runtime, TelemetryConfig +from temporalio.worker import PollerBehaviorSimpleMaximum, Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) + +from worker_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE +from worker_multiprocessing.activities import echo_pid_activity +from worker_multiprocessing.workflows import ParallelizedWorkflow + +# Immediately prevent the default Runtime from being created to ensure +# each process creates it's own +Runtime.prevent_default() + + +class Args(argparse.Namespace): + num_workflow_workers: int + num_activity_workers: int + + @property + def total_workers(self) -> int: + return self.num_activity_workers + self.num_workflow_workers + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-w", "--num-workflow-workers", type=int, default=2) + parser.add_argument("-a", "--num-activity-workers", type=int, default=1) + args = parser.parse_args(namespace=Args()) + print( + f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)" + ) + + # This sample prefers fork to avoid re-importing modules + # and decrease startup time. Fork is not available on all + # operating systems, so we fallback to 'spawn' when not available + try: + mp_ctx = multiprocessing.get_context("fork") + except ValueError: + mp_ctx = multiprocessing.get_context("spawn") # type: ignore + + with concurrent.futures.ProcessPoolExecutor( + args.total_workers, mp_context=mp_ctx + ) as executor: + # Start workflow workers by submitting them to the + # ProcessPoolExecutor + worker_futures = [ + executor.submit(worker_entry, "workflow", i) + for i in range(args.num_workflow_workers) + ] + + # In this sample, we start activity workers as separate processes in the + # same way we do workflow workers. In production, activity workers + # are often deployed separately from workflow workers to account for + # differing scaling characteristics. + worker_futures.extend( + [ + executor.submit(worker_entry, "activity", i) + for i in range(args.num_activity_workers) + ] + ) + + try: + print("waiting for keyboard interrupt or for all workers to exit") + for worker in concurrent.futures.as_completed(worker_futures): + print("ERROR: worker exited unexpectedly") + if worker.exception(): + traceback.print_exception(worker.exception()) + except KeyboardInterrupt: + pass + + +def worker_entry(worker_type: Literal["workflow", "activity"], id: int): + Runtime.set_default(Runtime(telemetry=TelemetryConfig())) + + async def run_worker(): + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + if worker_type == "workflow": + worker = workflow_worker(client) + else: + worker = activity_worker(client) + + try: + print(f"{worker_type}-worker:{id} starting") + await asyncio.shield(worker.run()) + except asyncio.CancelledError: + print(f"{worker_type}-worker:{id} shutting down") + await worker.shutdown() + + asyncio.run(run_worker()) + + +def workflow_worker(client: Client) -> Worker: + """ + Create a workflow worker that is configured to leverage being run + as many child processes. + """ + return Worker( + client, + task_queue=WORKFLOW_TASK_QUEUE, + workflows=[ParallelizedWorkflow], + # Workflow tasks are CPU bound, but generally execute quickly. + # Because we're leveraging multiprocessing to achieve parallelism, + # we want each workflow worker to be confirgured for small workflow + # task processing. + max_concurrent_workflow_tasks=2, + workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(2), + # Allow workflows to access the os module to access the pid + workflow_runner=SandboxedWorkflowRunner( + restrictions=dataclasses.replace( + SandboxRestrictions.default, + invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted( + "os" + ), + ) + ), + ) + + +def activity_worker(client: Client) -> Worker: + """ + Create a basic activity worker + """ + return Worker( + client, + task_queue=ACTIVITY_TASK_QUEUE, + activities=[echo_pid_activity], + ) + + +if __name__ == "__main__": + main() diff --git a/worker_multiprocessing/workflows.py b/worker_multiprocessing/workflows.py new file mode 100644 index 000000000..04f6b6351 --- /dev/null +++ b/worker_multiprocessing/workflows.py @@ -0,0 +1,22 @@ +import os +from datetime import timedelta + +from temporalio import workflow + +from worker_multiprocessing import ACTIVITY_TASK_QUEUE +from worker_multiprocessing.activities import echo_pid_activity + + +@workflow.defn +class ParallelizedWorkflow: + @workflow.run + async def run(self) -> str: + pid = os.getpid() + activity_result = await workflow.execute_activity( + echo_pid_activity, + f"wf-starting-pid:{pid}", + task_queue=ACTIVITY_TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=10), + ) + + return f"{activity_result} | wf-ending-pid:{pid}"