Skip to content

Commit ba5a87f

Browse files
authored
Rename Sticky Activities sample (#75)
Updated to worker_specific_task_queues
1 parent 092ac9c commit ba5a87f

File tree

11 files changed

+72
-63
lines changed

11 files changed

+72
-63
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ Some examples require extra dependencies. See each sample's directory for specif
5151
while running.
5252
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5353
<!-- Keep this list in alphabetical order -->
54-
* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers.
5554
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
5655
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
5756
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
@@ -65,6 +64,7 @@ Some examples require extra dependencies. See each sample's directory for specif
6564
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
6665
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
6766
* [sentry](sentry) - Report errors to Sentry.
67+
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
6868
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
6969

7070
## Test

activity_sticky_queues/README.md

Lines changed: 0 additions & 51 deletions
This file was deleted.

tests/activity_sticky_queues/activity_sticky_queues_activity_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pathlib import Path
22
from unittest import mock
33

4-
from activity_sticky_queues import tasks
4+
from worker_specific_task_queues import tasks
55

66
RETURNED_PATH = "valid/path"
77
tasks._get_delay_secs = mock.MagicMock(return_value=0.0001)

tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from temporalio.testing import WorkflowEnvironment
1010
from temporalio.worker import Worker
1111

12-
from activity_sticky_queues import tasks
12+
from worker_specific_task_queues import tasks
1313

1414
CHECKSUM = "a checksum"
1515
RETURNED_PATH = "valid/path"

worker_specific_task_queues/README.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Worker-Specific Task Queues
2+
3+
Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go SDK, this is explicitly supported via the Session option, but in other SDKs a different approach is required.
4+
5+
Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.
6+
7+
This strategy is:
8+
9+
- Each Worker process runs two `Worker`s:
10+
- One `Worker` listens on the `worker_specific_task_queue-distribution-queue` Task Queue.
11+
- Another `Worker` listens on a uniquely generated Task Queue.
12+
- The Workflow and the first Activity are run on `worker_specific_task_queue-distribution-queue`.
13+
- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**).
14+
- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue.
15+
16+
Check the Temporal Web UI to confirm tasks were staying with their respective worker.
17+
18+
It doesn't matter where the `get_available_task_queue` activity is run, so it can be executed on the shared Task Queue. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).
19+
20+
Activities have been artificially slowed with `time.sleep(3)` to simulate doing more work.
21+
22+
### Running This Sample
23+
24+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
25+
worker:
26+
27+
poetry run python worker.py
28+
29+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
30+
31+
poetry run python starter.py
32+
33+
#### Example output:
34+
35+
```bash
36+
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
37+
Output checksums:
38+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
39+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
40+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
41+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
42+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
43+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
44+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
45+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
46+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
47+
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
48+
```
49+
50+
<details>
51+
<summary>Checking the history to see where activities are run</summary>
52+
All activities for the one workflow are running against the same task queue, which corresponds to unique workers:
53+
54+
![image](./static/all-activitites-on-same-task-queue.png)
55+
56+
</details>

activity_sticky_queues/starter.py renamed to worker_specific_task_queues/starter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from temporalio.client import Client
55

6-
from activity_sticky_queues.tasks import FileProcessing
6+
from worker_specific_task_queues.tasks import FileProcessing
77

88

99
async def main():
@@ -15,8 +15,8 @@ async def main():
1515
for idx in range(10):
1616
result = client.execute_workflow(
1717
FileProcessing.run,
18-
id=f"activity_sticky_queue-workflow-id-{idx}",
19-
task_queue="activity_sticky_queue-distribution-queue",
18+
id=f"worker_specific_task_queue-workflow-id-{idx}",
19+
task_queue="worker_specific_task_queue-distribution-queue",
2020
)
2121
await asyncio.sleep(0.1)
2222
futures.append(result)

activity_sticky_queues/tasks.py renamed to worker_specific_task_queues/tasks.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,11 @@ class FileProcessing:
102102
async def run(self) -> str:
103103
"""Workflow implementing the basic file processing example.
104104
105-
First, a worker is selected randomly. This is the "sticky worker" on which
106-
the workflow runs. This consists of a file download and some processing task,
107-
with a file cleanup if an error occurs.
105+
First, a task queue is selected randomly. A single worker is listening on
106+
this queue, so when we execute all the file processing activities on this
107+
queue, they will all be run on the same worker, and all be able to access
108+
the same file on disk. The activities download the file, do some processing
109+
task on the file, and clean up the file.
108110
"""
109111
workflow.logger.info("Searching for available worker")
110112
unique_worker_task_queue = await workflow.execute_activity(

activity_sticky_queues/worker.py renamed to worker_specific_task_queues/worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from temporalio.client import Client
99
from temporalio.worker import Worker
1010

11-
from activity_sticky_queues import tasks
11+
from worker_specific_task_queues import tasks
1212

1313
interrupt_event = asyncio.Event()
1414

@@ -21,7 +21,9 @@ async def main():
2121
random.seed(667)
2222

2323
# Create random task queues and build task queue selection function
24-
task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
24+
task_queue: str = (
25+
f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}"
26+
)
2527

2628
@activity.defn(name="get_available_task_queue")
2729
async def select_task_queue() -> str:
@@ -35,7 +37,7 @@ async def select_task_queue() -> str:
3537
run_futures = []
3638
handle = Worker(
3739
client,
38-
task_queue="activity_sticky_queue-distribution-queue",
40+
task_queue="worker_specific_task_queue-distribution-queue",
3941
workflows=[tasks.FileProcessing],
4042
activities=[select_task_queue],
4143
)

0 commit comments

Comments
 (0)