Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce AirflowPokeFailException #40923

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class AirflowSensorTimeout(AirflowException):
"""Raise when there is a timeout on sensor polling."""


class AirflowPokeFailException(AirflowException):
"""Raise when a sensor must not try to poke again."""


class AirflowRescheduleException(AirflowException):
"""
Raise when the task should be re-scheduled at a later time.
Expand Down
11 changes: 1 addition & 10 deletions airflow/providers/amazon/aws/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
if TYPE_CHECKING:
from airflow.utils.context import Context

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, S3KeyTrigger
from airflow.sensors.base import BaseSensorOperator, poke_mode_only
Expand Down Expand Up @@ -219,9 +219,6 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
if not found_keys:
self._defer()
elif event["status"] == "error":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])

@deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning)
Expand Down Expand Up @@ -342,13 +339,10 @@ def is_keys_unchanged(self, current_objects: set[str]) -> bool:
)
return False

# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = (
f"Illegal behavior: objects were deleted in"
f" {os.path.join(self.bucket_name, self.prefix)} between pokes."
)
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

if self.last_activity_time:
Expand Down Expand Up @@ -411,8 +405,5 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None
event = validate_execute_complete_event(event)

if event and event["status"] == "error":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(event["message"])
raise AirflowException(event["message"])
return None
3 changes: 1 addition & 2 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow-providers-common-compat>=1.1.0
- apache-airflow>=2.7.1
- apache-airflow-providers-common-sql>=1.3.1
- apache-airflow-providers-http
- apache-airflow-providers-common-compat>=1.1.0
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/ftp/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1

integrations:
- integration-name: File Transfer Protocol (FTP)
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/ftp/sensors/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import re
from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowSkipException
from airflow.exceptions import AirflowSensorTimeout
from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -83,9 +83,8 @@ def poke(self, context: Context) -> bool:
if (error_code != 550) and (
self.fail_on_transient_errors or (error_code not in self.transient_errors)
):
if self.soft_fail:
raise AirflowSkipException from e
raise e
# TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0
raise AirflowSensorTimeout from e

return False

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/http/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1
# The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to
# release it as a requirement for airflow
- requests>=2.27.0,<3
Expand Down
6 changes: 1 addition & 5 deletions airflow/providers/http/sensors/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING, Any, Callable, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.triggers.http import HttpSensorTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -151,10 +151,6 @@ def poke(self, context: Context) -> bool:
except AirflowException as exc:
if str(exc).startswith(self.response_error_codes_allowlist):
return False
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException from exc

raise exc

return True
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/sftp/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ versions:
- 1.0.0

dependencies:
- apache-airflow>=2.7.0
- apache-airflow>=2.7.1
- apache-airflow-providers-ssh>=2.1.0
- paramiko>=2.9.0
- asyncssh>=2.12.0
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from paramiko.sftp import SFTP_NO_SUCH_FILE

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowSensorTimeout
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
Expand Down Expand Up @@ -98,10 +98,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time)
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException from e
raise e
# TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0
raise AirflowSensorTimeout from e
continue

if self.newer_than:
Expand Down
6 changes: 4 additions & 2 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowPokeFailException,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
Expand Down Expand Up @@ -128,11 +129,11 @@ class SkipPolicy(str, enum.Enum):
SKIP_ON_ANY_ERROR = "skip_on_any_error"

# If poke method raises AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException
# sensor will be skipped on.
# or AirflowPokeFailException sensor will be skipped on.
SKIP_ON_SOFT_ERROR = "skip_on_soft_error"

# If poke method raises an exception different from AirflowSensorTimeout, AirflowTaskTimeout,
# AirflowSkipException, sensor will ignore exception and re-poke until timeout.
# AirflowSkipException or AirflowPokeFailException sensor will ignore exception and re-poke until timeout.
IGNORE_ERROR = "ignore_error"


Expand Down Expand Up @@ -343,6 +344,7 @@ def run_duration() -> float:
AirflowSensorTimeout,
AirflowTaskTimeout,
AirflowFailException,
AirflowPokeFailException,
) as e:
if self.skip_policy == SkipPolicy.SKIP_ON_SOFT_ERROR:
raise AirflowSkipException("Skipping due skip_policy set to skip_on_soft_error.") from e
Expand Down
45 changes: 14 additions & 31 deletions airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import attr

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException, RemovedInAirflow3Warning
from airflow.exceptions import AirflowPokeFailException, AirflowSkipException, RemovedInAirflow3Warning
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
Expand Down Expand Up @@ -177,7 +177,7 @@ def __init__(
total_states = set(self.allowed_states + self.skipped_states + self.failed_states)

if len(total_states) != len(self.allowed_states) + len(self.skipped_states) + len(self.failed_states):
raise AirflowException(
raise ValueError(
"Duplicate values provided across allowed_states, skipped_states and failed_states."
)

Expand Down Expand Up @@ -288,32 +288,18 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:
# Fail if anything in the list has failed.
if count_failed > 0:
if self.external_task_ids:
if self.soft_fail:
raise AirflowSkipException(
f"Some of the external tasks {self.external_task_ids} "
f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail."
)
raise AirflowException(
raise AirflowPokeFailException(
f"Some of the external tasks {self.external_task_ids} "
f"in DAG {self.external_dag_id} failed."
)
elif self.external_task_group_id:
if self.soft_fail:
raise AirflowSkipException(
f"The external task_group '{self.external_task_group_id}' "
f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
)
raise AirflowException(
raise AirflowPokeFailException(
f"The external task_group '{self.external_task_group_id}' "
f"in DAG '{self.external_dag_id}' failed."
)

else:
if self.soft_fail:
raise AirflowSkipException(
f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail."
)
raise AirflowException(f"The external DAG {self.external_dag_id} failed.")
raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} failed.")

count_skipped = -1
if self.skipped_states:
Expand Down Expand Up @@ -366,38 +352,35 @@ def execute_complete(self, context, event=None):
if event["status"] == "success":
self.log.info("External tasks %s has executed successfully.", self.external_task_ids)
elif event["status"] == "skipped":
raise AirflowSkipException("External job has skipped skipping.")
raise AirflowPokeFailException("External job has skipped skipping.")
else:
if self.soft_fail:
raise AirflowSkipException("External job has failed skipping.")
else:
raise AirflowException(
"Error occurred while trying to retrieve task status. Please, check the "
"name of executed task and Dag."
)
raise AirflowPokeFailException(
"Error occurred while trying to retrieve task status. Please, check the "
"name of executed task and Dag."
)

def _check_for_existence(self, session) -> None:
dag_to_wait = DagModel.get_current(self.external_dag_id, session)

if not dag_to_wait:
raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.")
raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} does not exist.")

if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)):
raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.")
raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} was deleted.")

if self.external_task_ids:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
for external_task_id in self.external_task_ids:
if not refreshed_dag_info.has_task(external_task_id):
raise AirflowException(
raise AirflowPokeFailException(
f"The external task {external_task_id} in "
f"DAG {self.external_dag_id} does not exist."
)

if self.external_task_group_id:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task_group(self.external_task_group_id):
raise AirflowException(
raise AirflowPokeFailException(
f"The external task group '{self.external_task_group_id}' in "
f"DAG '{self.external_dag_id}' does not exist."
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/sensors/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import TYPE_CHECKING, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowPokeFailException
from airflow.hooks.filesystem import FSHook
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.file import FileTrigger
Expand Down Expand Up @@ -110,5 +110,5 @@ def execute(self, context: Context) -> None:

def execute_complete(self, context: Context, event: bool | None = None) -> None:
if not event:
raise AirflowException("%s task failed as %s not found.", self.task_id, self.filepath)
raise AirflowPokeFailException("%s task failed as %s not found.", self.task_id, self.filepath)
self.log.info("%s completed successfully as %s found.", self.task_id, self.filepath)
9 changes: 2 additions & 7 deletions airflow/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from typing import TYPE_CHECKING, Any, NoReturn

from airflow.exceptions import AirflowSkipException
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -77,12 +76,8 @@ def execute(self, context: Context) -> bool | NoReturn:
if timezone.utcnow() > target_dttm:
# If the target datetime is in the past, return immediately
return True
try:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
raise

trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)

self.defer(trigger=trigger, method_name="execute_complete")

Expand Down
4 changes: 2 additions & 2 deletions dev/breeze/tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def test_validate_provider_info_with_schema():
@pytest.mark.parametrize(
"provider_id, min_version",
[
("amazon", "2.7.0"),
("amazon", "2.7.1"),
("common.io", "2.8.0"),
],
)
Expand Down Expand Up @@ -496,7 +496,7 @@ def test_provider_jinja_context():
"CHANGELOG_RELATIVE_PATH": "../../airflow/providers/amazon",
"SUPPORTED_PYTHON_VERSIONS": ["3.8", "3.9", "3.10", "3.11", "3.12"],
"PLUGINS": [],
"MIN_AIRFLOW_VERSION": "2.7.0",
"MIN_AIRFLOW_VERSION": "2.7.1",
"PROVIDER_REMOVED": False,
"PROVIDER_INFO": provider_info,
}
Expand Down
9 changes: 4 additions & 5 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
"deps": [
"PyAthena>=3.0.10",
"apache-airflow-providers-common-compat>=1.1.0",
"apache-airflow-providers-common-compat>=1.1.0",
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow-providers-http",
"apache-airflow>=2.7.0",
"apache-airflow>=2.7.1",
"asgiref>=2.3.0",
"boto3>=1.34.90",
"botocore>=1.34.90",
Expand Down Expand Up @@ -575,7 +574,7 @@
},
"ftp": {
"deps": [
"apache-airflow>=2.7.0"
"apache-airflow>=2.7.1"
],
"devel-deps": [],
"plugins": [],
Expand Down Expand Up @@ -719,7 +718,7 @@
"http": {
"deps": [
"aiohttp>=3.9.2",
"apache-airflow>=2.7.0",
"apache-airflow>=2.7.1",
"asgiref",
"requests>=2.27.0,<3",
"requests_toolbelt"
Expand Down Expand Up @@ -1149,7 +1148,7 @@
"sftp": {
"deps": [
"apache-airflow-providers-ssh>=2.1.0",
"apache-airflow>=2.7.0",
"apache-airflow>=2.7.1",
"asyncssh>=2.12.0",
"paramiko>=2.9.0"
],
Expand Down
Loading
Loading