Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions providers/standard/src/airflow/providers/standard/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Exceptions used by Standard Provider."""

from __future__ import annotations

from airflow.exceptions import AirflowException


class AirflowExternalTaskSensorException(AirflowException):
"""Base exception for all ExternalTaskSensor related errors."""


class ExternalDagNotFoundError(AirflowExternalTaskSensorException):
"""Raised when the external DAG does not exist."""


class ExternalDagDeletedError(AirflowExternalTaskSensorException):
"""Raised when the external DAG was deleted."""


class ExternalTaskNotFoundError(AirflowExternalTaskSensorException):
"""Raised when the external task does not exist."""


class ExternalTaskGroupNotFoundError(AirflowExternalTaskSensorException):
"""Raised when the external task group does not exist."""


class ExternalTaskFailedError(AirflowExternalTaskSensorException):
"""Raised when the external task failed."""


class ExternalTaskGroupFailedError(AirflowExternalTaskSensorException):
"""Raised when the external task group failed."""


class ExternalDagFailedError(AirflowExternalTaskSensorException):
"""Raised when the external DAG failed."""


class DuplicateStateError(AirflowExternalTaskSensorException):
"""Raised when duplicate states are provided across allowed, skipped and failed states."""
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -24,9 +23,19 @@
from typing import TYPE_CHECKING, Any, Callable, ClassVar

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.providers.standard.exceptions import (
DuplicateStateError,
ExternalDagDeletedError,
ExternalDagFailedError,
ExternalDagNotFoundError,
ExternalTaskFailedError,
ExternalTaskGroupFailedError,
ExternalTaskGroupNotFoundError,
ExternalTaskNotFoundError,
)
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
Expand Down Expand Up @@ -190,7 +199,7 @@ def __init__(
total_states = set(self.allowed_states + self.skipped_states + self.failed_states)

if len(total_states) != len(self.allowed_states) + len(self.skipped_states) + len(self.failed_states):
raise AirflowException(
raise DuplicateStateError(
"Duplicate values provided across allowed_states, skipped_states and failed_states."
)

Expand Down Expand Up @@ -356,7 +365,7 @@ def _handle_failed_states(self, count_failed: float | int) -> None:
f"Some of the external tasks {self.external_task_ids} "
f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail."
)
raise AirflowException(
raise ExternalTaskFailedError(
f"Some of the external tasks {self.external_task_ids} "
f"in DAG {self.external_dag_id} failed."
)
Expand All @@ -366,15 +375,15 @@ def _handle_failed_states(self, count_failed: float | int) -> None:
f"The external task_group '{self.external_task_group_id}' "
f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
)
raise AirflowException(
raise ExternalTaskGroupFailedError(
f"The external task_group '{self.external_task_group_id}' "
f"in DAG '{self.external_dag_id}' failed."
)
if self.soft_fail:
raise AirflowSkipException(
f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail."
)
raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
raise ExternalDagFailedError(f"The external DAG {self.external_dag_id} failed.")

def _handle_skipped_states(self, count_skipped: float | int) -> None:
"""Handle skipped states and raise appropriate exceptions."""
Expand Down Expand Up @@ -443,10 +452,14 @@ def execute_complete(self, context, event=None):
self.log.info("External tasks %s has executed successfully.", self.external_task_ids)
elif event["status"] == "skipped":
raise AirflowSkipException("External job has skipped skipping.")
elif event["status"] == "failed":
if self.soft_fail:
raise AirflowSkipException("External job has failed skipping.")
raise ExternalDagFailedError("External job has failed.")
else:
if self.soft_fail:
raise AirflowSkipException("External job has failed skipping.")
raise AirflowException(
raise ExternalTaskNotFoundError(
"Error occurred while trying to retrieve task status. Please, check the "
"name of executed task and Dag."
)
Expand All @@ -455,23 +468,31 @@ def _check_for_existence(self, session) -> None:
dag_to_wait = DagModel.get_current(self.external_dag_id, session)

if not dag_to_wait:
raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.")
raise ExternalDagNotFoundError(f"The external DAG {self.external_dag_id} does not exist.")

if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.")
raise ExternalDagDeletedError(f"The external DAG {self.external_dag_id} was deleted.")

if self.external_task_ids:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info:
raise ExternalDagNotFoundError(
f"The external DAG {self.external_dag_id} could not be loaded."
)
for external_task_id in self.external_task_ids:
if not refreshed_dag_info.has_task(external_task_id):
raise AirflowException(
raise ExternalTaskNotFoundError(
f"The external task {external_task_id} in DAG {self.external_dag_id} does not exist."
)

if self.external_task_group_id:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info:
raise ExternalDagNotFoundError(
f"The external DAG {self.external_dag_id} could not be loaded."
)
if not refreshed_dag_info.has_task_group(self.external_task_group_id):
raise AirflowException(
raise ExternalTaskGroupNotFoundError(
f"The external task group '{self.external_task_group_id}' in "
f"DAG '{self.external_dag_id}' does not exist."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
if failed_count > 0:
yield TriggerEvent({"status": "failed"})
return
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused this why this else removed?
What if there is no allowed_states provided and only provided failed_states? i believe it goes infinite loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out.

The else statement was removed to ensure that even if no tasks are found in a failed state (failed_count == 0), the trigger proceeds to check the allowed_states. Previously, it would immediately yield a "success" event and exit, bypassing the allowed_states check.

But in the case you've mentioned it would get into infinite loop, originally I've considered that case too. But I think "success" in this context to "the specified failure condition was not met." not really "success". But it do cause issue for users to get into the infinite loop

To address the continuous polling when only failed_states are specified and no failure occurs, maybe we could add this check below to prevent get into the loop. wdyt?

elif not self.allowed_states and not self.skipped_states:
      yield TriggerEvent({"status": "success"}) # Or a new status like "no_failure_detected"
      return

yield TriggerEvent({"status": "success"})
return

if self.skipped_states:
skipped_count = await get_count_func(self.skipped_states)
if skipped_count > 0:
Expand Down
Loading