diff --git a/message_passing/message_handler_waiting_compensation_cleanup/activities.py b/message_passing/message_handler_waiting_compensation_cleanup/activities.py deleted file mode 100644 index 1a4a16d7..00000000 --- a/message_passing/message_handler_waiting_compensation_cleanup/activities.py +++ /dev/null @@ -1,8 +0,0 @@ -import asyncio - -from temporalio import activity - - -@activity.defn -async def my_activity(): - await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py b/message_passing/message_handler_waiting_compensation_cleanup/workflows.py deleted file mode 100644 index 528575bb..00000000 --- a/message_passing/message_handler_waiting_compensation_cleanup/workflows.py +++ /dev/null @@ -1,124 +0,0 @@ -import asyncio -from datetime import timedelta - -from temporalio import exceptions, workflow - -from message_passing.message_handler_waiting_compensation_cleanup import ( - OnWorkflowExitAction, - UpdateInput, - WorkflowExitType, - WorkflowInput, -) -from message_passing.message_handler_waiting_compensation_cleanup.activities import ( - my_activity, -) - - -@workflow.defn -class MyWorkflow: - """ - This Workflow upholds the following recommended practices: - - 1. The main workflow method ensures that all signal and update handlers are - finished before a successful return, and on failure, cancellation, and - continue-as-new. - 2. The update handler performs any necessary compensation/cleanup when the - workflow is cancelled, fails, or continues-as-new. - """ - - def __init__(self) -> None: - self.workflow_exit_exception: asyncio.Future[BaseException] = asyncio.Future() - - @workflow.run - async def run(self, input: WorkflowInput) -> str: - try: - # 👉 Use this `try...except` style, instead of waiting for message - # handlers to finish in a `finally` block. The reason is that other - # exception types will cause a Workflow Task failure, in which case - # we do *not* want to wait for message handlers to finish. - - # self._run would contain your actual workflow business logic. In - # this sample, its actual implementation contains nothing relevant. - result = await self._run(input) - await workflow.wait_condition(workflow.all_handlers_finished) - return result - except ( - asyncio.CancelledError, - workflow.ContinueAsNewError, - exceptions.FailureError, - ) as exc: - self.workflow_exit_exception.set_result(exc) - await workflow.wait_condition(workflow.all_handlers_finished) - raise exc - - @workflow.update - async def my_update(self, input: UpdateInput) -> str: - """ - This update handler demonstrates how to handle the situation where the - main Workflow method exits prematurely. In that case we perform - compensation/cleanup, and fail the Update. The Update caller will get a - WorkflowUpdateFailedError. - """ - # Coroutines must be wrapped in tasks in order to use workflow.wait. - update_task = asyncio.create_task(self._my_update()) - # 👉 Always use `workflow.wait` instead of `asyncio.wait` in Workflow - # code: asyncio's version is non-deterministic. - first_completed, _ = await workflow.wait( # type: ignore - [update_task, self.workflow_exit_exception], - return_when=asyncio.FIRST_COMPLETED, - ) - # 👉 It's possible that the update completed and the workflow exited - # prematurely in the same tick of the event loop. If the Update has - # completed, return the Update result to the caller, whether or not the - # Workflow is exiting. - if ( - update_task in first_completed - or input.on_premature_workflow_exit == OnWorkflowExitAction.CONTINUE - ): - return await update_task - else: - await self._my_update_compensation_and_cleanup() - raise exceptions.ApplicationError( - f"The update failed because the workflow run exited: {await self.workflow_exit_exception}" - ) - - async def _my_update(self) -> str: - """ - This handler calls a slow activity, so - - (1) In the case where the workflow finishes successfully, the worker - would get an UnfinishedUpdateHandlersWarning (TMPRL1102) if the main - workflow task didn't wait for it to finish. - - (2) In the other cases (failure, cancellation, and continue-as-new), the - premature workflow exit will occur before the update is finished. - """ - # Ignore: implementation detail specific to this sample - self._update_started = True - - await workflow.execute_activity( - my_activity, start_to_close_timeout=timedelta(seconds=10) - ) - return "update-result" - - async def _my_update_compensation_and_cleanup(self): - workflow.logger.info( - "Performing update handler compensation and cleanup operations" - ) - - async def _run(self, input: WorkflowInput) -> str: - # Ignore this method unless you are interested in the implementation - # details of this sample. - - # Wait until handlers started, so that we are demonstrating that we wait for them to finish. - await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) - if input.exit_type == WorkflowExitType.SUCCESS: - return "workflow-result" - elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: - workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) - elif input.exit_type == WorkflowExitType.FAILURE: - raise exceptions.ApplicationError("deliberately failing workflow") - elif input.exit_type == WorkflowExitType.CANCELLATION: - # Block forever; the starter will send a workflow cancellation request. - await asyncio.Future() - raise AssertionError("unreachable") diff --git a/message_passing/message_handler_waiting_compensation_cleanup/README.md b/message_passing/waiting_for_handlers_and_compensation/README.md similarity index 100% rename from message_passing/message_handler_waiting_compensation_cleanup/README.md rename to message_passing/waiting_for_handlers_and_compensation/README.md diff --git a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py b/message_passing/waiting_for_handlers_and_compensation/__init__.py similarity index 63% rename from message_passing/message_handler_waiting_compensation_cleanup/__init__.py rename to message_passing/waiting_for_handlers_and_compensation/__init__.py index b3a49b4f..35eb78c2 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/__init__.py +++ b/message_passing/waiting_for_handlers_and_compensation/__init__.py @@ -15,13 +15,3 @@ class WorkflowExitType(IntEnum): @dataclass class WorkflowInput: exit_type: WorkflowExitType - - -class OnWorkflowExitAction(IntEnum): - CONTINUE = 0 - ABORT_WITH_COMPENSATION = 1 - - -@dataclass -class UpdateInput: - on_premature_workflow_exit: OnWorkflowExitAction diff --git a/message_passing/waiting_for_handlers_and_compensation/activities.py b/message_passing/waiting_for_handlers_and_compensation/activities.py new file mode 100644 index 00000000..cec6372f --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/activities.py @@ -0,0 +1,13 @@ +import asyncio + +from temporalio import activity + + +@activity.defn +async def activity_executed_by_update_handler(): + await asyncio.sleep(1) + + +@activity.defn +async def activity_executed_by_update_handler_to_perform_compensation(): + await asyncio.sleep(1) diff --git a/message_passing/message_handler_waiting_compensation_cleanup/starter.py b/message_passing/waiting_for_handlers_and_compensation/starter.py similarity index 60% rename from message_passing/message_handler_waiting_compensation_cleanup/starter.py rename to message_passing/waiting_for_handlers_and_compensation/starter.py index ebf48033..aa74e6d1 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/starter.py +++ b/message_passing/waiting_for_handlers_and_compensation/starter.py @@ -1,40 +1,37 @@ import asyncio -from temporalio import client +from temporalio import client, common -from message_passing.message_handler_waiting_compensation_cleanup import ( +from message_passing.waiting_for_handlers_and_compensation import ( TASK_QUEUE, WORKFLOW_ID, - OnWorkflowExitAction, - UpdateInput, WorkflowExitType, WorkflowInput, ) -from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( - MyWorkflow, +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, ) -async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction): +async def starter(exit_type: WorkflowExitType): cl = await client.Client.connect("localhost:7233") wf_handle = await cl.start_workflow( - MyWorkflow.run, + WaitingForHandlersAndCompensationWorkflow.run, WorkflowInput(exit_type=exit_type), id=WORKFLOW_ID, task_queue=TASK_QUEUE, + id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING, ) - await _check_run(wf_handle, exit_type, update_action) + await _check_run(wf_handle, exit_type) async def _check_run( wf_handle: client.WorkflowHandle, exit_type: WorkflowExitType, - update_action: OnWorkflowExitAction, ): try: up_handle = await wf_handle.start_update( - MyWorkflow.my_update, - UpdateInput(on_premature_workflow_exit=update_action), + WaitingForHandlersAndCompensationWorkflow.my_update, wait_for_stage=client.WorkflowUpdateStage.ACCEPTED, ) except Exception as e: @@ -54,7 +51,7 @@ async def _check_run( ) if exit_type == WorkflowExitType.CONTINUE_AS_NEW: - await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action) + await _check_run(wf_handle, WorkflowExitType.SUCCESS) else: try: await wf_handle.result() @@ -66,14 +63,14 @@ async def _check_run( async def main(): - for exit_type in WorkflowExitType: + for exit_type in [ + WorkflowExitType.SUCCESS, + WorkflowExitType.FAILURE, + WorkflowExitType.CANCELLATION, + WorkflowExitType.CONTINUE_AS_NEW, + ]: print(f"\n\nworkflow exit type: {exit_type.name}") - for update_action in [ - OnWorkflowExitAction.CONTINUE, - OnWorkflowExitAction.ABORT_WITH_COMPENSATION, - ]: - print(f" update action on premature workflow exit: {update_action}") - await starter(exit_type, update_action) + await starter(exit_type) if __name__ == "__main__": diff --git a/message_passing/message_handler_waiting_compensation_cleanup/worker.py b/message_passing/waiting_for_handlers_and_compensation/worker.py similarity index 53% rename from message_passing/message_handler_waiting_compensation_cleanup/worker.py rename to message_passing/waiting_for_handlers_and_compensation/worker.py index c53cc441..64020693 100644 --- a/message_passing/message_handler_waiting_compensation_cleanup/worker.py +++ b/message_passing/waiting_for_handlers_and_compensation/worker.py @@ -4,12 +4,13 @@ from temporalio.client import Client from temporalio.worker import Worker -from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE -from message_passing.message_handler_waiting_compensation_cleanup.activities import ( - my_activity, +from message_passing.waiting_for_handlers_and_compensation import TASK_QUEUE +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, ) -from message_passing.message_handler_waiting_compensation_cleanup.workflows import ( - MyWorkflow, +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, ) interrupt_event = asyncio.Event() @@ -23,8 +24,11 @@ async def main(): async with Worker( client, task_queue=TASK_QUEUE, - workflows=[MyWorkflow], - activities=[my_activity], + workflows=[WaitingForHandlersAndCompensationWorkflow], + activities=[ + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, + ], ): logging.info("Worker started, ctrl+c to exit") await interrupt_event.wait() diff --git a/message_passing/waiting_for_handlers_and_compensation/workflows.py b/message_passing/waiting_for_handlers_and_compensation/workflows.py new file mode 100644 index 00000000..8b37989b --- /dev/null +++ b/message_passing/waiting_for_handlers_and_compensation/workflows.py @@ -0,0 +1,160 @@ +import asyncio +from datetime import timedelta +from typing import cast + +from temporalio import exceptions, workflow + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.activities import ( + activity_executed_by_update_handler, + activity_executed_by_update_handler_to_perform_compensation, +) + + +@workflow.defn +class WaitingForHandlersAndCompensationWorkflow: + """ + This Workflow demonstrates how to wait for message handlers to finish and + perform compensation/cleanup: + + 1. It ensures that all signal and update handlers have finished before a + successful return, and on failure, cancellation, and continue-as-new. + 2. The update handler performs any necessary compensation/cleanup when the + workflow is cancelled, fails, or continues-as-new. + """ + + def __init__(self) -> None: + # 👉 If the workflow exits prematurely, this future will be completed + # with the associated exception as its value. Message handlers can then + # "race" this future against a task performing the message handler's own + # application logic; if this future completes before the message handler + # task then the handler should abort and perform compensation. + self.workflow_exit = asyncio.Future() + + @workflow.run + async def run(self, input: WorkflowInput) -> str: + try: + # 👉 Use this `try...except` style, instead of waiting for message + # handlers to finish in a `finally` block. The reason is that some + # exception types cause a workflow task failure as opposed to + # workflow exit, in which case we do *not* want to wait for message + # handlers to finish. + + # 👉 self._run contains your actual application logic. This is + # implemented in a separate method in order to separate + # "platform-level" concerns (waiting for handlers to finish and + # ensuring that compensation is performed when appropriate) from + # application logic. In this sample, its actual implementation is + # below but contains nothing relevant. + result = await self._run(input) + self.workflow_exit.set_result(None) + await workflow.wait_condition(workflow.all_handlers_finished) + return result + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + self.workflow_exit.set_exception(e) + await workflow.wait_condition(workflow.all_handlers_finished) + raise + + @workflow.update + async def my_update(self) -> str: + """ + An update handler that handles exceptions in itself and in the main + workflow method. + + It ensures that: + - Compensation/cleanup is always performed when appropriate + - The update caller gets the update result, or WorkflowUpdateFailedError + """ + # 👉 As with the main workflow method, the update application logic is + # implemented in a separate method in order to separate "platform-level" + # error-handling and compensation concerns from application logic. Note + # that coroutines must be wrapped in tasks in order to use + # workflow.wait. + update_task = asyncio.create_task(self._my_update()) + + # 👉 "Race" the workflow_exit future against the handler's own application + # logic. Always use `workflow.wait` instead of `asyncio.wait` in + # Workflow code: asyncio's version is non-deterministic. + await workflow.wait( + [update_task, self.workflow_exit], return_when=asyncio.FIRST_EXCEPTION + ) + try: + if update_task.done(): + # 👉 The update has finished (whether successfully or not). + # Regardless of whether the main workflow method is about to + # exit or not, the update caller should receive a response + # informing them of the outcome of the update. So return the + # result, or raise the exception that caused the update handler + # to exit. + return await update_task + else: + # 👉 The main workflow method exited prematurely due to an + # error, and this happened before the update finished. Fail the + # update with the workflow exception as cause. + raise exceptions.ApplicationError( + "The update failed because the workflow run exited" + ) from cast(BaseException, self.workflow_exit.exception()) + # 👉 Catch BaseException since asyncio.CancelledError does not inherit + # from Exception. + except BaseException as e: + if is_workflow_exit_exception(e): + try: + await self.my_update_compensation() + except BaseException as e: + raise exceptions.ApplicationError( + "Update compensation failed" + ) from e + raise + + async def my_update_compensation(self): + await workflow.execute_activity( + activity_executed_by_update_handler_to_perform_compensation, + start_to_close_timeout=timedelta(seconds=10), + ) + + # The following two methods are placeholders for the actual application + # logic that you would perform in your main workflow method or update + # handler. Their implementation can be ignored. + + async def _my_update(self) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + self._update_started = True + await workflow.execute_activity( + activity_executed_by_update_handler, + start_to_close_timeout=timedelta(seconds=10), + ) + return "update-result" + + async def _run(self, input: WorkflowInput) -> str: + # Ignore this method unless you are interested in the implementation + # details of this sample. + + # Wait until handlers have started, so that we are demonstrating that we + # wait for them to finish. + await workflow.wait_condition(lambda: getattr(self, "_update_started", False)) + if input.exit_type == WorkflowExitType.SUCCESS: + return "workflow-result" + elif input.exit_type == WorkflowExitType.CONTINUE_AS_NEW: + workflow.continue_as_new(WorkflowInput(exit_type=WorkflowExitType.SUCCESS)) + elif input.exit_type == WorkflowExitType.FAILURE: + raise exceptions.ApplicationError("deliberately failing workflow") + elif input.exit_type == WorkflowExitType.CANCELLATION: + # Block forever; the starter will send a workflow cancellation request. + await asyncio.Future() + raise AssertionError("unreachable") + + +def is_workflow_exit_exception(e: BaseException) -> bool: + # 👉 If you have set additional failure_exception_types you should also + # check for these here. + return isinstance( + e, + (asyncio.CancelledError, workflow.ContinueAsNewError, exceptions.FailureError), + ) diff --git a/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py new file mode 100644 index 00000000..db92dbe9 --- /dev/null +++ b/tests/message_passing/waiting_for_handlers_and_compensation/workflow_test.py @@ -0,0 +1,82 @@ +import uuid + +import pytest +from temporalio.client import Client, WorkflowHandle, WorkflowUpdateStage +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from message_passing.waiting_for_handlers_and_compensation import ( + WorkflowExitType, + WorkflowInput, +) +from message_passing.waiting_for_handlers_and_compensation.starter import ( + TASK_QUEUE, +) +from message_passing.waiting_for_handlers_and_compensation.workflows import ( + WaitingForHandlersAndCompensationWorkflow, +) + + +async def test_waiting_for_handlers_and_compensation( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[WaitingForHandlersAndCompensationWorkflow], + ): + await starter( + WorkflowExitType.SUCCESS, + client, + ) + + +async def starter(exit_type: WorkflowExitType, cl: Client): + wf_handle = await cl.start_workflow( + WaitingForHandlersAndCompensationWorkflow.run, + WorkflowInput(exit_type=exit_type), + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + await _check_run(wf_handle, exit_type) + + +async def _check_run( + wf_handle: WorkflowHandle, + exit_type: WorkflowExitType, +): + try: + up_handle = await wf_handle.start_update( + WaitingForHandlersAndCompensationWorkflow.my_update, + wait_for_stage=WorkflowUpdateStage.ACCEPTED, + ) + except Exception as e: + print( + f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}" + ) + + if exit_type == WorkflowExitType.CANCELLATION: + await wf_handle.cancel() + + try: + await up_handle.result() + print(" 🟢 caller received update result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}" + ) + + if exit_type == WorkflowExitType.CONTINUE_AS_NEW: + await _check_run(wf_handle, WorkflowExitType.SUCCESS) + else: + try: + await wf_handle.result() + print(" 🟢 caller received workflow result") + except Exception as e: + print( + f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}" + )