From a166e8d3d12ef3ca3ddd9a70f6ec29fd788b04c5 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 25 Apr 2024 07:06:41 -0700 Subject: [PATCH 1/2] Determine fail_stop on client side when db isolated This is needed because we do not ser the dag on Operator objects. --- airflow/models/taskinstance.py | 15 +++++++++++++-- airflow/serialization/pydantic/taskinstance.py | 4 ++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2fd5df910afd0..da4e1b54c94d3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -878,6 +878,7 @@ def _handle_failure( test_mode: bool | None = None, context: Context | None = None, force_fail: bool = False, + fail_stop: bool = False, ) -> None: """ Handle Failure for a task instance. @@ -901,6 +902,7 @@ def _handle_failure( context=context, force_fail=force_fail, session=session, + fail_stop=fail_stop, ) _log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "") @@ -2964,8 +2966,13 @@ def fetch_handle_failure_context( context: Context | None = None, force_fail: bool = False, session: Session = NEW_SESSION, + fail_stop: bool = False, ): - """Handle Failure for the TaskInstance.""" + """ + Handle Failure for the TaskInstance. + + :param fail_stop: if true, stop remaining tasks in dag + """ get_listener_manager().hook.on_task_instance_failed( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session ) @@ -3028,7 +3035,7 @@ def fetch_handle_failure_context( email_for_state = operator.attrgetter("email_on_failure") callbacks = task.on_failure_callback if task else None - if task and task.dag and task.dag.fail_stop: + if task and fail_stop: _stop_remaining_tasks(task_instance=ti, session=session) else: if ti.state == TaskInstanceState.QUEUED: @@ -3077,6 +3084,9 @@ def handle_failure( :param context: Jinja2 context :param force_fail: if True, task does not retry """ + if TYPE_CHECKING: + assert self.task + assert self.task.dag _handle_failure( task_instance=self, error=error, @@ -3084,6 +3094,7 @@ def handle_failure( test_mode=test_mode, context=context, force_fail=force_fail, + fail_stop=self.task.dag.fail_stop, ) def is_eligible_to_retry(self): diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index cc52aa9989f3d..1d09b37a8dadc 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -276,6 +276,9 @@ def handle_failure( """ from airflow.models.taskinstance import _handle_failure + if TYPE_CHECKING: + assert self.task + assert self.task.dag _handle_failure( task_instance=self, error=error, @@ -283,6 +286,7 @@ def handle_failure( test_mode=test_mode, context=context, force_fail=force_fail, + fail_stop=self.task.dag.fail_stop, ) def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> None: From 9cfe9a5d8f82cbc117df8c02fac77b728491058f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 26 Apr 2024 23:12:08 -0700 Subject: [PATCH 2/2] fix tests --- airflow/models/taskinstance.py | 6 +++++- airflow/serialization/pydantic/taskinstance.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index da4e1b54c94d3..ae4714d4b4e58 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -3087,6 +3087,10 @@ def handle_failure( if TYPE_CHECKING: assert self.task assert self.task.dag + try: + fail_stop = self.task.dag.fail_stop + except Exception: + fail_stop = False _handle_failure( task_instance=self, error=error, @@ -3094,7 +3098,7 @@ def handle_failure( test_mode=test_mode, context=context, force_fail=force_fail, - fail_stop=self.task.dag.fail_stop, + fail_stop=fail_stop, ) def is_eligible_to_retry(self): diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 1d09b37a8dadc..2e01bf415a141 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -279,6 +279,10 @@ def handle_failure( if TYPE_CHECKING: assert self.task assert self.task.dag + try: + fail_stop = self.task.dag.fail_stop + except Exception: + fail_stop = False _handle_failure( task_instance=self, error=error, @@ -286,7 +290,7 @@ def handle_failure( test_mode=test_mode, context=context, force_fail=force_fail, - fail_stop=self.task.dag.fail_stop, + fail_stop=fail_stop, ) def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> None: