Skip to content

Commit 090b96d

Browse files
authored
Convert hello activities to sync (#173)
Converted activities in hello samples from async to sync
1 parent 5d64065 commit 090b96d

16 files changed

+125
-28
lines changed

hello/hello_activity.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from dataclasses import dataclass
34
from datetime import timedelta
45

@@ -18,7 +19,7 @@ class ComposeGreetingInput:
1819

1920
# Basic activity that logs and does string concatenation
2021
@activity.defn
21-
async def compose_greeting(input: ComposeGreetingInput) -> str:
22+
def compose_greeting(input: ComposeGreetingInput) -> str:
2223
activity.logger.info("Running activity with parameter %s" % input)
2324
return f"{input.greeting}, {input.name}!"
2425

@@ -50,6 +51,10 @@ async def main():
5051
task_queue="hello-activity-task-queue",
5152
workflows=[GreetingWorkflow],
5253
activities=[compose_greeting],
54+
# Non-async activities require an executor;
55+
# a thread pool executor is recommended.
56+
# This same thread pool could be passed to multiple workers if desired.
57+
activity_executor=ThreadPoolExecutor(5),
5358
):
5459

5560
# While the worker is running, use the client to run the workflow and

hello/hello_activity_async.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
5+
from temporalio import activity, workflow
6+
from temporalio.client import Client
7+
from temporalio.worker import Worker
8+
9+
10+
# While we could use multiple parameters in the activity, Temporal strongly
11+
# encourages using a single dataclass instead which can have fields added to it
12+
# in a backwards-compatible way.
13+
@dataclass
14+
class ComposeGreetingInput:
15+
greeting: str
16+
name: str
17+
18+
19+
# Basic activity that logs and does string concatenation
20+
@activity.defn
21+
async def compose_greeting(input: ComposeGreetingInput) -> str:
22+
activity.logger.info("Running activity with parameter %s" % input)
23+
return f"{input.greeting}, {input.name}!"
24+
25+
26+
# Basic workflow that logs and invokes an activity
27+
@workflow.defn
28+
class GreetingWorkflow:
29+
@workflow.run
30+
async def run(self, name: str) -> str:
31+
workflow.logger.info("Running workflow with parameter %s" % name)
32+
return await workflow.execute_activity(
33+
compose_greeting,
34+
ComposeGreetingInput("Hello", name),
35+
start_to_close_timeout=timedelta(seconds=10),
36+
)
37+
38+
39+
async def main():
40+
# Uncomment the lines below to see logging output
41+
# import logging
42+
# logging.basicConfig(level=logging.INFO)
43+
44+
# Start client
45+
client = await Client.connect("localhost:7233")
46+
47+
# Run a worker for the workflow
48+
async with Worker(
49+
client,
50+
task_queue="hello-activity-task-queue",
51+
workflows=[GreetingWorkflow],
52+
activities=[compose_greeting],
53+
# If the worker is only running async activities, you don't need
54+
# to supply an activity executor because they run in
55+
# the worker's event loop.
56+
):
57+
58+
# While the worker is running, use the client to run the workflow and
59+
# print out its result. Note, in many production setups, the client
60+
# would be in a completely separate process from the worker.
61+
result = await client.execute_workflow(
62+
GreetingWorkflow.run,
63+
"World",
64+
id="hello-activity-workflow-id",
65+
task_queue="hello-activity-task-queue",
66+
)
67+
print(f"Result: {result}")
68+
69+
70+
if __name__ == "__main__":
71+
asyncio.run(main())

hello/hello_activity_choice.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from dataclasses import dataclass
34
from datetime import timedelta
45
from enum import IntEnum
@@ -12,22 +13,22 @@
1213

1314

1415
@activity.defn
15-
async def order_apples(amount: int) -> str:
16+
def order_apples(amount: int) -> str:
1617
return f"Ordered {amount} Apples..."
1718

1819

1920
@activity.defn
20-
async def order_bananas(amount: int) -> str:
21+
def order_bananas(amount: int) -> str:
2122
return f"Ordered {amount} Bananas..."
2223

2324

2425
@activity.defn
25-
async def order_cherries(amount: int) -> str:
26+
def order_cherries(amount: int) -> str:
2627
return f"Ordered {amount} Cherries..."
2728

2829

2930
@activity.defn
30-
async def order_oranges(amount: int) -> str:
31+
def order_oranges(amount: int) -> str:
3132
return f"Ordered {amount} Oranges..."
3233

3334

@@ -88,6 +89,7 @@ async def main():
8889
task_queue="hello-activity-choice-task-queue",
8990
workflows=[PurchaseFruitsWorkflow],
9091
activities=[order_apples, order_bananas, order_cherries, order_oranges],
92+
activity_executor=ThreadPoolExecutor(5),
9193
):
9294

9395
# While the worker is running, use the client to run the workflow and

hello/hello_activity_threaded.py renamed to hello/hello_activity_heartbeat.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import threading
32
import time
43
from concurrent.futures import ThreadPoolExecutor
54
from dataclasses import dataclass
@@ -21,7 +20,7 @@ def compose_greeting(input: ComposeGreetingInput) -> str:
2120
# We'll wait for 3 seconds, heartbeating in between (like all long-running
2221
# activities should do), then return the greeting
2322
for _ in range(0, 3):
24-
print(f"Heartbeating activity on thread {threading.get_ident()}")
23+
print(f"Heartbeating activity")
2524
activity.heartbeat()
2625
time.sleep(1)
2726
return f"{input.greeting}, {input.name}!"
@@ -47,12 +46,9 @@ async def main():
4746
# Run a worker for the workflow
4847
async with Worker(
4948
client,
50-
task_queue="hello-activity-threaded-task-queue",
49+
task_queue="hello-activity-heartbeating-task-queue",
5150
workflows=[GreetingWorkflow],
5251
activities=[compose_greeting],
53-
# Synchronous activities are not allowed unless we provide some kind of
54-
# executor. This same thread pool could be passed to multiple workers if
55-
# desired.
5652
activity_executor=ThreadPoolExecutor(5),
5753
):
5854

@@ -62,10 +58,10 @@ async def main():
6258
result = await client.execute_workflow(
6359
GreetingWorkflow.run,
6460
"World",
65-
id="hello-activity-threaded-workflow-id",
66-
task_queue="hello-activity-threaded-task-queue",
61+
id="hello-activity-heartbeating-workflow-id",
62+
task_queue="hello-activity-heartbeating-task-queue",
6763
)
68-
print(f"Result on thread {threading.get_ident()}: {result}")
64+
print(f"Result: {result}")
6965

7066

7167
if __name__ == "__main__":

hello/hello_activity_retry.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from dataclasses import dataclass
34
from datetime import timedelta
45

@@ -15,7 +16,7 @@ class ComposeGreetingInput:
1516

1617

1718
@activity.defn
18-
async def compose_greeting(input: ComposeGreetingInput) -> str:
19+
def compose_greeting(input: ComposeGreetingInput) -> str:
1920
print(f"Invoking activity, attempt number {activity.info().attempt}")
2021
# Fail the first 3 attempts, succeed the 4th
2122
if activity.info().attempt < 4:
@@ -52,6 +53,7 @@ async def main():
5253
task_queue="hello-activity-retry-task-queue",
5354
workflows=[GreetingWorkflow],
5455
activities=[compose_greeting],
56+
activity_executor=ThreadPoolExecutor(5),
5557
):
5658

5759
# While the worker is running, use the client to run the workflow and

hello/hello_cancellation.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,32 @@
11
import asyncio
2+
import time
23
import traceback
4+
from concurrent.futures import ThreadPoolExecutor
35
from datetime import timedelta
46
from typing import NoReturn
57

68
from temporalio import activity, workflow
79
from temporalio.client import Client, WorkflowFailureError
10+
from temporalio.exceptions import CancelledError
811
from temporalio.worker import Worker
912

1013

1114
@activity.defn
12-
async def never_complete_activity() -> NoReturn:
15+
def never_complete_activity() -> NoReturn:
1316
# All long-running activities should heartbeat. Heartbeat is how
1417
# cancellation is delivered from the server.
1518
try:
1619
while True:
1720
print("Heartbeating activity")
1821
activity.heartbeat()
19-
await asyncio.sleep(1)
20-
except asyncio.CancelledError:
22+
time.sleep(1)
23+
except CancelledError:
2124
print("Activity cancelled")
2225
raise
2326

2427

2528
@activity.defn
26-
async def cleanup_activity() -> None:
29+
def cleanup_activity() -> None:
2730
print("Executing cleanup activity")
2831

2932

@@ -56,6 +59,7 @@ async def main():
5659
task_queue="hello-cancellation-task-queue",
5760
workflows=[CancellationWorkflow],
5861
activities=[never_complete_activity, cleanup_activity],
62+
activity_executor=ThreadPoolExecutor(5),
5963
):
6064

6165
# While the worker is running, use the client to start the workflow.

hello/hello_cron.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from dataclasses import dataclass
34
from datetime import timedelta
45

@@ -14,7 +15,7 @@ class ComposeGreetingInput:
1415

1516

1617
@activity.defn
17-
async def compose_greeting(input: ComposeGreetingInput) -> str:
18+
def compose_greeting(input: ComposeGreetingInput) -> str:
1819
return f"{input.greeting}, {input.name}!"
1920

2021

@@ -40,6 +41,7 @@ async def main():
4041
task_queue="hello-cron-task-queue",
4142
workflows=[GreetingWorkflow],
4243
activities=[compose_greeting],
44+
activity_executor=ThreadPoolExecutor(5),
4345
):
4446

4547
print("Running workflow once a minute")

hello/hello_exception.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
from concurrent.futures import ThreadPoolExecutor
34
from dataclasses import dataclass
45
from datetime import timedelta
56
from typing import NoReturn, Optional
@@ -18,7 +19,7 @@ class ComposeGreetingInput:
1819

1920

2021
@activity.defn
21-
async def compose_greeting(input: ComposeGreetingInput) -> NoReturn:
22+
def compose_greeting(input: ComposeGreetingInput) -> NoReturn:
2223
# Always raise exception
2324
raise RuntimeError(f"Greeting exception: {input.greeting}, {input.name}!")
2425

@@ -46,6 +47,7 @@ async def main():
4647
task_queue="hello-exception-task-queue",
4748
workflows=[GreetingWorkflow],
4849
activities=[compose_greeting],
50+
activity_executor=ThreadPoolExecutor(5),
4951
):
5052

5153
# While the worker is running, use the client to run the workflow and

hello/hello_local_activity.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
23
from dataclasses import dataclass
34
from datetime import timedelta
45

@@ -14,7 +15,7 @@ class ComposeGreetingInput:
1415

1516

1617
@activity.defn
17-
async def compose_greeting(input: ComposeGreetingInput) -> str:
18+
def compose_greeting(input: ComposeGreetingInput) -> str:
1819
return f"{input.greeting}, {input.name}!"
1920

2021

@@ -39,6 +40,7 @@ async def main():
3940
task_queue="hello-local-activity-task-queue",
4041
workflows=[GreetingWorkflow],
4142
activities=[compose_greeting],
43+
activity_executor=ThreadPoolExecutor(5),
4244
):
4345

4446
# While the worker is running, use the client to run the workflow and

hello/hello_mtls.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import argparse
22
import asyncio
3+
from concurrent.futures import ThreadPoolExecutor
34
from dataclasses import dataclass
45
from datetime import timedelta
56
from typing import Optional
@@ -18,7 +19,7 @@ class ComposeGreetingInput:
1819

1920
# Basic activity that logs and does string concatenation
2021
@activity.defn
21-
async def compose_greeting(input: ComposeGreetingInput) -> str:
22+
def compose_greeting(input: ComposeGreetingInput) -> str:
2223
return f"{input.greeting}, {input.name}!"
2324

2425

@@ -79,6 +80,7 @@ async def main():
7980
task_queue="hello-mtls-task-queue",
8081
workflows=[GreetingWorkflow],
8182
activities=[compose_greeting],
83+
activity_executor=ThreadPoolExecutor(5),
8284
):
8385

8486
# While the worker is running, use the client to run the workflow and

0 commit comments

Comments
 (0)