Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion exercises/querying-workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ the complete version in the `solution` subdirectory.

In this part of the exercise, you will define your Query.

1. Edit the `workflow.py` file. The Workflow runs a blocking `while True` loop that will cause it to wait for either the `exit()` Signal or a `submit_greeting()` Signal before doing anything. A variable called `self._current_state` that provides information about the Workflow Execution is also updated in several places. You will add a Query that returns the status of this variable.
1. Edit the `workflow.py` file.

The Workflow runs a blocking `while True` loop that will cause it to wait for either the `exit()` Signal or a `submit_greeting()` Signal before doing anything. A variable called `self._current_state` that provides information about the Workflow Execution is also updated in several places. You will add a Query that returns the status of this variable.

Comment on lines -19 to +22
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One time Tom suggested to me to try to separate my instructions from the prose/explanation. That's what I was going for here. But totally up to you 👍

2. Anywhere within the Workflow definition (for example, just before your Signal functions), add a Query function called `current_state_query()`. It should be annotated with `@workflow.query` and `return self._current_state`.
3. Save the file.

Expand Down Expand Up @@ -74,4 +77,11 @@ In the terminal you ran your query in, run `python signalclient.py`. It will sen
Result: ['Hello, User 1']
```

Notice that because `workflow.py` shuts down the Worker when
the Workflow finishes, you cannot Query the Workflow after you
have Signalled it to exit. In contrast, if you ran the Worker
in one file/terminal and the Workflow in another, the Workflow
would shut down and the Worker would keep running, and you could
Query it even after it has exited.

Comment on lines +80 to +86
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can include this if we want. Up to you 👍

### This is the end of the exercise.
7 changes: 2 additions & 5 deletions exercises/querying-workflows/practice/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ async def run(self) -> List[str]:

# TODO Part A: Define a query function called `current_state_query()`.
# It should be annotated with @workflow.query and return self._current_state.
@workflow.query
def current_state_query(self) -> str:
return self._current_state
Comment on lines -36 to -38
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in the file, but I don't think it should be. I think putting it here is part of the exercise


@workflow.signal
async def submit_greeting(self, name: str) -> None:
Expand All @@ -53,13 +50,13 @@ async def main():
# Run a worker for the workflow
async with Worker(
client,
task_queue="queries",
task_queue="queries-queue",
workflows=[MyWorkflow],
):
result = await client.execute_workflow(
MyWorkflow.run,
id="queries",
task_queue="queries",
task_queue="queries-queue",
Comment on lines -62 to +59
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the workflow ID and the task queue were both called queries, so I just changed the queue to queries-queue to disambiguate. Not necessary -- up to you

)
print(f"Result: {result}")

Expand Down
4 changes: 2 additions & 2 deletions exercises/querying-workflows/solution/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ async def main():
# Run a worker for the workflow
async with Worker(
client,
task_queue="queries",
task_queue="queries-queue",
workflows=[MyWorkflow],
):
result = await client.execute_workflow(
MyWorkflow.run,
id="queries",
task_queue="queries",
task_queue="queries-queue",
)
print(f"Result: {result}")

Expand Down
4 changes: 2 additions & 2 deletions exercises/sending-signals-client/practice/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self) -> None:
async def run(self) -> List[str]:
greetings: List[str] = []
workflow.logger.info("Running workflow.")
print("Running workflow.")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it isn't production code, but I figure we don't want print statements in workflow code

while True:
await workflow.wait_condition(
lambda: not self._pending_greetings.empty() or self._exit
Expand All @@ -28,7 +28,7 @@ async def run(self) -> List[str]:
if self._exit:
return greetings

# TODO Part A: Define a Signal function, annoted with @workflow.signal.
# TODO Part A: Define a Signal function named submit_greeting, annoted with @workflow.signal.
# It should take an additional string argument called `name`.
# When the signal is received, it should call await self._pending_greetings.put(name).
# You can use the `exit()` Signal function below as a reference.
Expand Down
2 changes: 1 addition & 1 deletion exercises/sending-signals-client/solution/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self) -> None:
async def run(self) -> List[str]:
greetings: List[str] = []
workflow.logger.info("Running workflow.")
print("Running workflow.")

while True:
await workflow.wait_condition(
lambda: not self._pending_greetings.empty() or self._exit
Expand Down
21 changes: 12 additions & 9 deletions exercises/sending-signals-external/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ the complete version in the `solution` subdirectory.

## Part A: Defining and Handling the Signal

1. This exercise contains one Client that runs two different Workflows
— `PizzaWorkflow.order_pizza` and `FulfillOrderWorkflow.fulfill_order`. Both
Workflows are defined in `workflow.py`. `PizzaWorkflow.order_pizza` is
designed not to complete its final activity — `send_bill` — until it receives
a Signal from `FulfillOrderWorkflow.fulfill_order`. You'll start by defining
that Signal. Edit `workflow.py`. At the bottom of the `PizzaWorkflow`
This exercise contains one Client that runs two different Workflows
— `PizzaWorkflow.order_pizza` and `FulfillOrderWorkflow.fulfill_order`. Both
Workflows are defined in `workflow.py`. `PizzaWorkflow.order_pizza` is
designed not to complete its final activity — `send_bill` — until it receives
a Signal from `FulfillOrderWorkflow.fulfill_order`. You'll start by defining
that Signal.

1. Edit `workflow.py`. At the bottom of the `PizzaWorkflow`
Comment on lines -17 to +24
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. just separating prose from instruction. totally up to you 👍

definition, add a Signal function for `fulfill_order_signal()`. It should be
decorated with `@workflow.signal` and take an additional boolean argument
called `success`. When the signal is received, if `success==True`, it should
call `self._pending_confirmation.put(True)`, enabling the `PizzaWorkflow` to
compelete.
set self._signal_received to `True` and call `await self._pending_confirmation.put(success)`,
enabling the `PizzaWorkflow` to compelete.
2. Save the file.

## Part B: Getting a Handle on your Workflow
Expand All @@ -47,7 +49,8 @@ Now you will add the `handle.signal()` call itself.
the `PizzaWorkflow`. Directly after the `client.start_workflow()` call for
the `PizzaWorkflow`, add another call that starts the `FulfillOrderWorkflow`.
You can use the call that starts the `PizzaWorkflow` as a reference. It can
use the same Task Queue, but needs to use a different Workflow ID.
use the same Task Queue, but needs to use a different Workflow ID. Also, you
don't need the handle because you don't need to await it or use its return value.
2. Save and close the file.

## Part E: Running both Workflows
Expand Down
2 changes: 2 additions & 0 deletions exercises/sending-signals-external/practice/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ async def main():
# TODO Part D: Start the `FulfillOrderWorkflow`.
# You can use the `PizzaOrderWorkflow` above as a reference.
# It can use the same Task Queue, but needs to use a different Workflow ID.
# Also, you don't need the handle because you don't
# need to await it or use its return value.

result = await handle.result()

Expand Down
43 changes: 22 additions & 21 deletions exercises/sending-signals-external/practice/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
@workflow.defn
class PizzaOrderWorkflow:
def __init__(self) -> None:
self._pending_confirmation: asyncio.Queue[str] = asyncio.Queue()
self._pending_confirmation: asyncio.Queue[bool] = asyncio.Queue()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was a copy-paste thing. The queue is bools

self._signal_received: bool = False

@workflow.run
async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation:
async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation | None:
Comment on lines -18 to +19
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support if the order was not fulfilled and we don't return a confirmation

workflow.logger.info(f"order_pizza workflow invoked")

address = order.address
Expand All @@ -37,30 +38,30 @@ async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation:

workflow.logger.info(f"distance is {distance.kilometers}")

while True:
await workflow.wait_condition(
lambda: not self._pending_confirmation.empty()
)
await workflow.wait_condition(
lambda: self._signal_received,
timeout=3
)
Comment on lines -40 to +44
Copy link
Author

@GSmithApps GSmithApps Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling I'm missing something, but I don't think we need to loop. Like, if we want to just get the signal and if it's yes or no, bill the customer accordingly, then I don't think we need it. But if we want to sort of wait for multiple signals, then maybe we'd need a loop. Curious what you think

Or maybe there's something in the readme or course content that I missed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is kind of a weird loop implementation, but based on earlier conversations with the Python SDK team and trying to align with our samples, Python really likes to use loops and queues for Signal detection. I'd like to take everything in this PR except these last two changes -- yes they're a bit weird, but I think they illustrate some expandable patterns.

Copy link

@dandavison dandavison Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting discussion!

Regarding the differences between main and @GSmithApps's PR, there is a flaw in the Python SDK (temporalio/sdk-python#618) which means that the outer while loop is indeed technically necessary. However, I am not seeing a need for the inner while loop: if we enter it then we return or throw and hence never re-enter it.

At a higher level though, my take is that the current code (in main) is handling the signal in a Go-like style: pushing the work into a queue which is processed in the main workflow body. And I would say that that is not idiomatic for the Python SDK (or any SDK other than Go): the purpose of handler functions is that the implementation details of handling can be isolated in the handler function. (There are advantages other than modularity and implementation-hiding, e.g. the handling can more easily be instrumented with observability emissions if it is done in a handler function). Effectively, distinct handler invocations form their own queue.

So, my question is: is there a reason why these materials are encouraging the queue style? If not, then I think we should go with neither the code in main, nor the adjustments made in @GSmithApps's PR, but rather switch to the handler-based implementation.

@drewhoskins-temporal addressed this topic in a recent blog post, as you probably know: https://temporal.io/blog/robust-message-handlers#the-battle-between-handlers-and-queues


while not self._pending_confirmation.empty():
bill = Bill(
customer_id=order.customer.customer_id,
order_number=order.order_number,
description="Pizza order",
amount=total_price,
)
confirmation = await workflow.execute_activity_method(
PizzaOrderActivities.send_bill,
bill,
start_to_close_timeout=timedelta(seconds=5),
)
return confirmation
if self._pending_confirmation.get_nowait():
bill = Bill(
customer_id=order.customer.customer_id,
order_number=order.order_number,
description="Pizza order",
amount=total_price,
)
confirmation = await workflow.execute_activity_method(
PizzaOrderActivities.send_bill,
bill,
start_to_close_timeout=timedelta(seconds=5),
)
return confirmation
Comment on lines -45 to +58
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that we want to check the signal for true or false, then bill the customer accordingly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment. And thanks for this!


# TODO Part A: Add a `fulfill_order_signal()` function to receive the Signal.
# It should be decorated with `@workflow.signal` and accept an
# additional boolean argument called `success`. When the signal is received,
# if `success==True`, it should call
# `self._pending_confirmation.put(True)
# it should set self._signal_received to `True` and call
# `self._pending_confirmation.put(success)


@workflow.defn
Expand Down
47 changes: 24 additions & 23 deletions exercises/sending-signals-external/solution/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
# Import activity, passing it through the sandbox without reloading the module
Copy link
Author

@GSmithApps GSmithApps Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 👇 are the same changes as above, but in the solution instead of the practice

with workflow.unsafe.imports_passed_through():
from activities import PizzaOrderActivities
from shared import Bill, OrderConfirmation, PizzaOrder
from shared import Bill, OrderConfirmation, PizzaOrder, WORKFLOW_ID_PREFIX

@workflow.defn
class PizzaOrderWorkflow:
def __init__(self) -> None:
self._pending_confirmation: asyncio.Queue[str] = asyncio.Queue()
self._pending_confirmation: asyncio.Queue[bool] = asyncio.Queue()
self._signal_received: bool = False

@workflow.run
async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation:
async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation | None:
workflow.logger.info(f"order_pizza workflow invoked")

address = order.address
Expand All @@ -37,29 +38,29 @@ async def order_pizza(self, order: PizzaOrder) -> OrderConfirmation:

workflow.logger.info(f"distance is {distance.kilometers}")

while True:
await workflow.wait_condition(
lambda: not self._pending_confirmation.empty()
)
await workflow.wait_condition(
lambda: self._signal_received,
timeout=3
)

while not self._pending_confirmation.empty():
bill = Bill(
customer_id=order.customer.customer_id,
order_number=order.order_number,
description="Pizza order",
amount=total_price,
)
confirmation = await workflow.execute_activity_method(
PizzaOrderActivities.send_bill,
bill,
start_to_close_timeout=timedelta(seconds=5),
)
return confirmation
if not self._pending_confirmation.get_nowait():
bill = Bill(
customer_id=order.customer.customer_id,
order_number=order.order_number,
description="Pizza order",
amount=total_price,
)
confirmation = await workflow.execute_activity_method(
PizzaOrderActivities.send_bill,
bill,
start_to_close_timeout=timedelta(seconds=5),
)
return confirmation

@workflow.signal
async def fulfill_order_signal(self, success: bool) -> None:
if success == True:
await self._pending_confirmation.put(True)
self._signal_received = True
await self._pending_confirmation.put(success)


@workflow.defn
Expand All @@ -80,7 +81,7 @@ async def fulfill_order(self, order: PizzaOrder):
start_to_close_timeout=timedelta(seconds=5),
)

handle = workflow.get_external_workflow_handle("pizza-workflow-order-XD001")
handle = workflow.get_external_workflow_handle(WORKFLOW_ID_PREFIX + f"{order.order_number}")
await handle.signal("fulfill_order_signal", True)

return "orderFulfilled"
13 changes: 10 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
# I think the ones below that are commented out were giving
# me trouble, which is why I commented them out.

# But I think all we really need is temporalio.
# that's all I installed, and I could do everything
# that I needed.

aiosignal==1.3.1
async-timeout==4.0.2
attrs==23.1.0
blinker==1.6.2
charset-normalizer==3.1.0
click==8.1.3
docopt==0.6.2
frozenlist==1.3.3
#frozenlist==1.3.3
idna==3.4
iniconfig==2.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.3
multidict==6.0.4
#multidict==6.0.4
packaging==23.1
pluggy==1.3.0
pytest==7.4.2
temporalio==1.6.0
types-protobuf==4.23.0.1
typing_extensions==4.6.3
Werkzeug==3.0.3
yarl==1.9.2
#yarl==1.9.2
Loading