Skip to content

Commit 8903768

Browse files
authored
Samples for message passing docs (#133)
* Message passing introduction sample (used in docs)
1 parent 7e5aba8 commit 8903768

File tree

16 files changed

+389
-8
lines changed

16 files changed

+389
-8
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,13 @@ Some examples require extra dependencies. See each sample's directory for specif
6262
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
6363
* [gevent_async](gevent_async) - Combine gevent and Temporal.
6464
* [langchain](langchain) - Orchestrate workflows for LangChain.
65+
* [message-passing introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
6566
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
6667
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
6768
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
6869
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
6970
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
70-
* [safe_message_handlers](updates_and_signals/safe_message_handlers/) - Safely handling updates and signals.
71+
* [safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7172
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
7273
* [sentry](sentry) - Report errors to Sentry.
7374
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
File renamed without changes.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Introduction to message-passing
2+
3+
This sample provides an introduction to using Query, Signal, and Update.
4+
5+
See https://docs.temporal.io/develop/python/message-passing.
6+
7+
To run, first see the main [README.md](../../README.md) for prerequisites.
8+
9+
Then create two terminals and `cd` to this directory.
10+
11+
Run the worker in one terminal:
12+
13+
poetry run python worker.py
14+
15+
And execute the workflow in the other terminal:
16+
17+
poetry run python starter.py
18+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from enum import IntEnum
2+
3+
TASK_QUEUE = "message-passing-introduction-task-queue"
4+
5+
6+
class Language(IntEnum):
7+
ARABIC = 1
8+
CHINESE = 2
9+
ENGLISH = 3
10+
FRENCH = 4
11+
HINDI = 5
12+
PORTUGUESE = 6
13+
SPANISH = 7
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from temporalio import activity
5+
6+
from message_passing.introduction import Language
7+
8+
9+
@activity.defn
10+
async def call_greeting_service(to_language: Language) -> Optional[str]:
11+
"""
12+
An Activity that simulates a call to a remote greeting service.
13+
The remote greeting service supports the full range of languages.
14+
"""
15+
greetings = {
16+
Language.ARABIC: "مرحبا بالعالم",
17+
Language.CHINESE: "你好,世界",
18+
Language.ENGLISH: "Hello, world",
19+
Language.FRENCH: "Bonjour, monde",
20+
Language.HINDI: "नमस्ते दुनिया",
21+
Language.PORTUGUESE: "Olá mundo",
22+
Language.SPANISH: "¡Hola mundo",
23+
}
24+
await asyncio.sleep(0.2) # Simulate a network call
25+
return greetings.get(to_language)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from temporalio.client import Client, WorkflowUpdateStage
5+
6+
from message_passing.introduction import TASK_QUEUE
7+
from message_passing.introduction.workflows import (
8+
ApproveInput,
9+
GetLanguagesInput,
10+
GreetingWorkflow,
11+
Language,
12+
)
13+
14+
15+
async def main(client: Optional[Client] = None):
16+
client = client or await Client.connect("localhost:7233")
17+
wf_handle = await client.start_workflow(
18+
GreetingWorkflow.run,
19+
id="greeting-workflow-1234",
20+
task_queue=TASK_QUEUE,
21+
)
22+
23+
# 👉 Send a Query
24+
supported_languages = await wf_handle.query(
25+
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
26+
)
27+
print(f"supported languages: {supported_languages}")
28+
29+
# 👉 Execute an Update
30+
previous_language = await wf_handle.execute_update(
31+
GreetingWorkflow.set_language, Language.CHINESE
32+
)
33+
current_language = await wf_handle.query(GreetingWorkflow.get_language)
34+
print(f"language changed: {previous_language.name} -> {current_language.name}")
35+
36+
# 👉 Start an Update and then wait for it to complete
37+
update_handle = await wf_handle.start_update(
38+
GreetingWorkflow.set_language_using_activity,
39+
Language.ARABIC,
40+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
41+
)
42+
previous_language = await update_handle.result()
43+
current_language = await wf_handle.query(GreetingWorkflow.get_language)
44+
print(f"language changed: {previous_language.name} -> {current_language.name}")
45+
46+
# 👉 Send a Signal
47+
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
48+
print(await wf_handle.result())
49+
50+
51+
if __name__ == "__main__":
52+
asyncio.run(main())
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from message_passing.introduction import TASK_QUEUE
8+
from message_passing.introduction.activities import call_greeting_service
9+
from message_passing.introduction.workflows import GreetingWorkflow
10+
11+
interrupt_event = asyncio.Event()
12+
13+
14+
async def main():
15+
logging.basicConfig(level=logging.INFO)
16+
17+
client = await Client.connect("localhost:7233")
18+
19+
async with Worker(
20+
client,
21+
task_queue=TASK_QUEUE,
22+
workflows=[GreetingWorkflow],
23+
activities=[call_greeting_service],
24+
):
25+
logging.info("Worker started, ctrl+c to exit")
26+
await interrupt_event.wait()
27+
logging.info("Shutting down")
28+
29+
30+
if __name__ == "__main__":
31+
loop = asyncio.new_event_loop()
32+
try:
33+
loop.run_until_complete(main())
34+
except KeyboardInterrupt:
35+
interrupt_event.set()
36+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
from typing import List, Optional
5+
6+
from temporalio import workflow
7+
from temporalio.exceptions import ApplicationError
8+
9+
with workflow.unsafe.imports_passed_through():
10+
from message_passing.introduction import Language
11+
from message_passing.introduction.activities import call_greeting_service
12+
13+
14+
@dataclass
15+
class GetLanguagesInput:
16+
include_unsupported: bool
17+
18+
19+
@dataclass
20+
class ApproveInput:
21+
name: str
22+
23+
24+
@workflow.defn
25+
class GreetingWorkflow:
26+
"""
27+
A workflow that that returns a greeting in one of two languages.
28+
29+
It supports a Query to obtain the current language, an Update to change the
30+
current language and receive the previous language in response, and a Signal
31+
to approve the Workflow so that it is allowed to return its result.
32+
"""
33+
34+
# 👉 This Workflow does not use any async handlers and so cannot use any
35+
# Activities. It only supports two languages, whose greetings are hardcoded
36+
# in the Workflow definition. See GreetingWorkflowWithAsyncHandler below for
37+
# a Workflow that uses an async Update handler to call an Activity.
38+
39+
def __init__(self) -> None:
40+
self.approved_for_release = False
41+
self.approver_name: Optional[str] = None
42+
self.greetings = {
43+
Language.CHINESE: "你好,世界",
44+
Language.ENGLISH: "Hello, world",
45+
}
46+
self.language = Language.ENGLISH
47+
self.lock = asyncio.Lock() # used by the async handler below
48+
49+
@workflow.run
50+
async def run(self) -> str:
51+
# 👉 In addition to waiting for the `approve` Signal, we also wait for
52+
# all handlers to finish. Otherwise, the Workflow might return its
53+
# result while an async set_language_using_activity Update is in
54+
# progress.
55+
await workflow.wait_condition(
56+
lambda: self.approved_for_release and workflow.all_handlers_finished()
57+
)
58+
return self.greetings[self.language]
59+
60+
@workflow.query
61+
def get_languages(self, input: GetLanguagesInput) -> List[Language]:
62+
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
63+
if input.include_unsupported:
64+
return sorted(Language)
65+
else:
66+
return sorted(self.greetings)
67+
68+
@workflow.signal
69+
def approve(self, input: ApproveInput) -> None:
70+
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
71+
self.approved_for_release = True
72+
self.approver_name = input.name
73+
74+
@workflow.update
75+
def set_language(self, language: Language) -> Language:
76+
# 👉 An Update handler can mutate the Workflow state and return a value.
77+
previous_language, self.language = self.language, language
78+
return previous_language
79+
80+
@set_language.validator
81+
def validate_language(self, language: Language) -> None:
82+
if language not in self.greetings:
83+
# 👉 In an Update validator you raise any exception to reject the Update.
84+
raise ValueError(f"{language.name} is not supported")
85+
86+
@workflow.update
87+
async def set_language_using_activity(self, language: Language) -> Language:
88+
# 👉 This update handler is async, so it can execute an activity.
89+
if language not in self.greetings:
90+
# 👉 We use a lock so that, if this handler is executed multiple
91+
# times, each execution can schedule the activity only when the
92+
# previously scheduled activity has completed. This ensures that
93+
# multiple calls to set_language are processed in order.
94+
async with self.lock:
95+
greeting = await workflow.execute_activity(
96+
call_greeting_service,
97+
language,
98+
start_to_close_timeout=timedelta(seconds=10),
99+
)
100+
if greeting is None:
101+
# 👉 An update validator cannot be async, so cannot be used
102+
# to check that the remote call_greeting_service supports
103+
# the requested language. Raising ApplicationError will fail
104+
# the Update, but the WorkflowExecutionUpdateAccepted event
105+
# will still be added to history.
106+
raise ApplicationError(
107+
f"Greeting service does not support {language.name}"
108+
)
109+
self.greetings[language] = greeting
110+
previous_language, self.language = self.language, language
111+
return previous_language
112+
113+
@workflow.query
114+
def get_language(self) -> Language:
115+
return self.language

updates_and_signals/safe_message_handlers/starter.py renamed to message_passing/safe_message_handlers/starter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from temporalio import common
88
from temporalio.client import Client, WorkflowHandle
99

10-
from updates_and_signals.safe_message_handlers.workflow import (
10+
from message_passing.safe_message_handlers.workflow import (
1111
ClusterManagerAssignNodesToJobInput,
1212
ClusterManagerDeleteJobInput,
1313
ClusterManagerInput,

updates_and_signals/safe_message_handlers/worker.py renamed to message_passing/safe_message_handlers/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7-
from updates_and_signals.safe_message_handlers.workflow import (
7+
from message_passing.safe_message_handlers.workflow import (
88
ClusterManagerWorkflow,
99
assign_nodes_to_job,
1010
find_bad_nodes,
@@ -15,7 +15,6 @@
1515

1616

1717
async def main():
18-
# Connect client
1918
client = await Client.connect("localhost:7233")
2019

2120
async with Worker(
@@ -24,7 +23,6 @@ async def main():
2423
workflows=[ClusterManagerWorkflow],
2524
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
2625
):
27-
# Wait until interrupted
2826
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
2927
await interrupt_event.wait()
3028
logging.info("Shutting down")

updates_and_signals/safe_message_handlers/workflow.py renamed to message_passing/safe_message_handlers/workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from temporalio.common import RetryPolicy
99
from temporalio.exceptions import ApplicationError
1010

11-
from updates_and_signals.safe_message_handlers.activities import (
11+
from message_passing.safe_message_handlers.activities import (
1212
AssignNodesToJobInput,
1313
FindBadNodesInput,
1414
UnassignNodesForJobInput,

0 commit comments

Comments
 (0)