Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers.

## Test

Expand Down
33 changes: 15 additions & 18 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@ dev = [
"poethepoet>=0.36.0",
]
bedrock = ["boto3>=1.34.92,<2"]
dsl = [
"pyyaml>=6.0.1,<7",
"types-pyyaml>=6.0.12,<7",
"dacite>=1.8.1,<2",
]
encryption = [
"cryptography>=38.0.1,<39",
"aiohttp>=3.8.1,<4",
]
dsl = ["pyyaml>=6.0.1,<7", "types-pyyaml>=6.0.12,<7", "dacite>=1.8.1,<2"]
encryption = ["cryptography>=38.0.1,<39", "aiohttp>=3.8.1,<4"]
gevent = ["gevent>=25.4.2 ; python_version >= '3.8'"]
langchain = [
"langchain>=0.1.7,<0.2 ; python_version >= '3.8.1' and python_version < '4.0'",
Expand All @@ -46,9 +39,7 @@ langchain = [
"tqdm>=4.62.0,<5",
"uvicorn[standard]>=0.24.0.post1,<0.25",
]
nexus = [
"nexus-rpc>=1.1.0,<2",
]
nexus = ["nexus-rpc>=1.1.0,<2"]
open-telemetry = [
"temporalio[opentelemetry]",
"opentelemetry-exporter-otlp-proto-grpc",
Expand All @@ -60,17 +51,16 @@ openai-agents = [
]
pydantic-converter = ["pydantic>=2.10.6,<3"]
sentry = ["sentry-sdk>=2.13.0"]
trio-async = [
"trio>=0.28.0,<0.29",
"trio-asyncio>=0.15.0,<0.16",
]
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
cloud-export-to-parquet = [
"pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'",
"numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'",
"boto3>=1.34.89,<2",
"pyarrow>=19.0.1",
]

[tool.hatch.metadata]
allow-direct-references = true

[tool.hatch.build.targets.sdist]
include = ["./**/*.py"]
Expand Down Expand Up @@ -118,8 +108,15 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.poe.tasks]
format = [{cmd = "uv run ruff check --select I --fix"}, {cmd = "uv run ruff format"}]
lint = [{cmd = "uv run ruff check --select I"}, {cmd = "uv run ruff format --check"}, {ref = "lint-types"}]
format = [
{ cmd = "uv run ruff check --select I --fix" },
{ cmd = "uv run ruff format" },
]
lint = [
{ cmd = "uv run ruff check --select I" },
{ cmd = "uv run ruff format --check" },
{ ref = "lint-types" },
]
lint-types = "uv run --all-groups mypy --check-untyped-defs --namespace-packages ."
test = "uv run --all-groups pytest"

Expand Down
2 changes: 1 addition & 1 deletion sleep_for_days/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days.

To run, first see the main [README.md](../../README.md) for prerequisites.
To run, first see the main [README.md](../README.md) for prerequisites.

Then create two terminals.

Expand Down
93 changes: 93 additions & 0 deletions worker_multiprocessing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Worker Multiprocessing Sample


## Python Concurrency Limitations

CPU-bound tasks effectively cannot run in parallel in Python due to the [Global Interpreter Lock (GIL)](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The Python standard library's [`threading` module](https://docs.python.org/3/library/threading.html) provides the following guidance:

> CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

## Temporal Workflow Tasks in Python

[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU-bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html), as suggested by the `threading` documentation, to more appropriately utilize machine resources.

This sample demonstrates how to use `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes.

## Running the Sample

To run, first see the root [README.md](../README.md) for prerequisites. Then execute the following commands from the root directory:

```
uv run worker_multiprocessing/worker.py
uv run worker_multiprocessing/starter.py
```

Both `worker.py` and `starter.py` have minimal arguments that can be adjusted to modify how the sample runs.

```
uv run worker_multiprocessing/worker.py -h

usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS]

options:
-h, --help show this help message and exit
-w, --num-workflow-workers NUM_WORKFLOW_WORKERS
-a, --num-activity-workers NUM_ACTIVITY_WORKERS
```

```
uv run worker_multiprocessing/starter.py -h

usage: starter.py [-h] [-n NUM_WORKFLOWS]

options:
-h, --help show this help message and exit
-n, --num-workflows NUM_WORKFLOWS
the number of workflows to execute
```

## Example Output

```
uv run worker_multiprocessing/worker.py

starting 2 workflow worker(s) and 1 activity worker(s)
waiting for keyboard interrupt or for all workers to exit
workflow-worker:0 starting
workflow-worker:1 starting
activity-worker:0 starting
workflow-worker:0 shutting down
activity-worker:0 shutting down
workflow-worker:1 shutting down
```


```
uv run worker_multiprocessing/starter.py

wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
```
2 changes: 2 additions & 0 deletions worker_multiprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
WORKFLOW_TASK_QUEUE = "workflow-task-queue"
ACTIVITY_TASK_QUEUE = "activity-task-queue"
8 changes: 8 additions & 0 deletions worker_multiprocessing/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os

from temporalio import activity


@activity.defn
async def echo_pid_activity(input: str) -> str:
return f"{input} | activity-pid:{os.getpid()}"
48 changes: 48 additions & 0 deletions worker_multiprocessing/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import argparse
import asyncio
import uuid

from temporalio.client import Client
from temporalio.envconfig import ClientConfig

from worker_multiprocessing import WORKFLOW_TASK_QUEUE
from worker_multiprocessing.workflows import ParallelizedWorkflow


class Args(argparse.Namespace):
num_workflows: int


async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-n",
"--num-workflows",
help="the number of workflows to execute",
type=int,
default=25,
)
args = parser.parse_args(namespace=Args())

config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

# Start several workflows
wf_handles = [
client.execute_workflow(
ParallelizedWorkflow.run,
id=f"greeting-workflow-id-{uuid.uuid4()}",
task_queue=WORKFLOW_TASK_QUEUE,
)
for _ in range(args.num_workflows)
]

# Wait for workflow completion
for wf in asyncio.as_completed(wf_handles):
result = await wf
print(result)


if __name__ == "__main__":
asyncio.run(main())
146 changes: 146 additions & 0 deletions worker_multiprocessing/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import argparse
import asyncio
import concurrent.futures
import dataclasses
import multiprocessing
import traceback
from typing import Literal

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.runtime import Runtime, TelemetryConfig
from temporalio.worker import PollerBehaviorSimpleMaximum, Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

from worker_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE
from worker_multiprocessing.activities import echo_pid_activity
from worker_multiprocessing.workflows import ParallelizedWorkflow

# Immediately prevent the default Runtime from being created to ensure
# each process creates it's own
Runtime.prevent_default()


class Args(argparse.Namespace):
num_workflow_workers: int
num_activity_workers: int

@property
def total_workers(self) -> int:
return self.num_activity_workers + self.num_workflow_workers


def main():
parser = argparse.ArgumentParser()
parser.add_argument("-w", "--num-workflow-workers", type=int, default=2)
parser.add_argument("-a", "--num-activity-workers", type=int, default=1)
args = parser.parse_args(namespace=Args())
print(
f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)"
)

# This sample prefers fork to avoid re-importing modules
# and decrease startup time. Fork is not available on all
# operating systems, so we fallback to 'spawn' when not available
try:
mp_ctx = multiprocessing.get_context("fork")
except ValueError:
mp_ctx = multiprocessing.get_context("spawn") # type: ignore

with concurrent.futures.ProcessPoolExecutor(
args.total_workers, mp_context=mp_ctx
) as executor:
# Start workflow workers by submitting them to the
# ProcessPoolExecutor
worker_futures = [
executor.submit(worker_entry, "workflow", i)
for i in range(args.num_workflow_workers)
]

# In this sample, we start activity workers as separate processes in the
# same way we do workflow workers. In production, activity workers
# are often deployed separately from workflow workers to account for
# differing scaling characteristics.
worker_futures.extend(
[
executor.submit(worker_entry, "activity", i)
for i in range(args.num_activity_workers)
]
)

try:
print("waiting for keyboard interrupt or for all workers to exit")
for worker in concurrent.futures.as_completed(worker_futures):
print("ERROR: worker exited unexpectedly")
if worker.exception():
traceback.print_exception(worker.exception())
except KeyboardInterrupt:
pass


def worker_entry(worker_type: Literal["workflow", "activity"], id: int):
Runtime.set_default(Runtime(telemetry=TelemetryConfig()))

async def run_worker():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

if worker_type == "workflow":
worker = workflow_worker(client)
else:
worker = activity_worker(client)

try:
print(f"{worker_type}-worker:{id} starting")
await asyncio.shield(worker.run())
except asyncio.CancelledError:
print(f"{worker_type}-worker:{id} shutting down")
await worker.shutdown()

asyncio.run(run_worker())


def workflow_worker(client: Client) -> Worker:
"""
Create a workflow worker that is configured to leverage being run
as many child processes.
"""
return Worker(
client,
task_queue=WORKFLOW_TASK_QUEUE,
workflows=[ParallelizedWorkflow],
# Workflow tasks are CPU bound, but generally execute quickly.
# Because we're leveraging multiprocessing to achieve parallelism,
# we want each workflow worker to be confirgured for small workflow
# task processing.
max_concurrent_workflow_tasks=2,
workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(2),
# Allow workflows to access the os module to access the pid
workflow_runner=SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(
"os"
),
)
),
)


def activity_worker(client: Client) -> Worker:
"""
Create a basic activity worker
"""
return Worker(
client,
task_queue=ACTIVITY_TASK_QUEUE,
activities=[echo_pid_activity],
)


if __name__ == "__main__":
main()
Loading
Loading