diff --git a/airflow/exceptions.py b/airflow/exceptions.py index dc59f91841133..702a8381718d4 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -64,6 +64,10 @@ class AirflowSensorTimeout(AirflowException): """Raise when there is a timeout on sensor polling.""" +class AirflowPokeFailException(AirflowException): + """Raise when a sensor must not try to poke again.""" + + class AirflowRescheduleException(AirflowException): """ Raise when the task should be re-scheduled at a later time. diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index 9c524494cdeb5..cc5aff014588d 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -33,7 +33,7 @@ if TYPE_CHECKING: from airflow.utils.context import Context -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, S3KeyTrigger from airflow.sensors.base import BaseSensorOperator, poke_mode_only @@ -219,9 +219,6 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None: if not found_keys: self._defer() elif event["status"] == "error": - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException(event["message"]) raise AirflowException(event["message"]) @deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning) @@ -342,13 +339,10 @@ def is_keys_unchanged(self, current_objects: set[str]) -> bool: ) return False - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 message = ( f"Illegal behavior: objects were deleted in" f" {os.path.join(self.bucket_name, self.prefix)} between pokes." ) - if self.soft_fail: - raise AirflowSkipException(message) raise AirflowException(message) if self.last_activity_time: @@ -411,8 +405,5 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None event = validate_execute_complete_event(event) if event and event["status"] == "error": - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException(event["message"]) raise AirflowException(event["message"]) return None diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3aa3f40059599..b0ef2eefd8415 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -88,8 +88,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 - - apache-airflow-providers-common-compat>=1.1.0 + - apache-airflow>=2.7.1 - apache-airflow-providers-common-sql>=1.3.1 - apache-airflow-providers-http - apache-airflow-providers-common-compat>=1.1.0 diff --git a/airflow/providers/ftp/provider.yaml b/airflow/providers/ftp/provider.yaml index c766fcf237b02..89702c1c73617 100644 --- a/airflow/providers/ftp/provider.yaml +++ b/airflow/providers/ftp/provider.yaml @@ -52,7 +52,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 integrations: - integration-name: File Transfer Protocol (FTP) diff --git a/airflow/providers/ftp/sensors/ftp.py b/airflow/providers/ftp/sensors/ftp.py index 847cf763537dd..d957b06a8391e 100644 --- a/airflow/providers/ftp/sensors/ftp.py +++ b/airflow/providers/ftp/sensors/ftp.py @@ -21,7 +21,7 @@ import re from typing import TYPE_CHECKING, Sequence -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook from airflow.sensors.base import BaseSensorOperator @@ -83,9 +83,8 @@ def poke(self, context: Context) -> bool: if (error_code != 550) and ( self.fail_on_transient_errors or (error_code not in self.transient_errors) ): - if self.soft_fail: - raise AirflowSkipException from e - raise e + # TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0 + raise AirflowSensorTimeout from e return False diff --git a/airflow/providers/http/provider.yaml b/airflow/providers/http/provider.yaml index 647002c62a78b..7c85c69249be8 100644 --- a/airflow/providers/http/provider.yaml +++ b/airflow/providers/http/provider.yaml @@ -59,7 +59,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 # The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to # release it as a requirement for airflow - requests>=2.27.0,<3 diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py index 3691764333b64..33b5e1d4defb4 100644 --- a/airflow/providers/http/sensors/http.py +++ b/airflow/providers/http/sensors/http.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.triggers.http import HttpSensorTrigger from airflow.sensors.base import BaseSensorOperator @@ -151,10 +151,6 @@ def poke(self, context: Context) -> bool: except AirflowException as exc: if str(exc).startswith(self.response_error_codes_allowlist): return False - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException from exc - raise exc return True diff --git a/airflow/providers/sftp/provider.yaml b/airflow/providers/sftp/provider.yaml index b2e8f64992df4..fedb97b92a223 100644 --- a/airflow/providers/sftp/provider.yaml +++ b/airflow/providers/sftp/provider.yaml @@ -64,7 +64,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.7.0 + - apache-airflow>=2.7.1 - apache-airflow-providers-ssh>=2.1.0 - paramiko>=2.9.0 - asyncssh>=2.12.0 diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index f56ad9341001d..58f3269945661 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -26,7 +26,7 @@ from paramiko.sftp import SFTP_NO_SUCH_FILE from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowSensorTimeout from airflow.providers.sftp.hooks.sftp import SFTPHook from airflow.providers.sftp.triggers.sftp import SFTPTrigger from airflow.sensors.base import BaseSensorOperator, PokeReturnValue @@ -98,10 +98,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool: self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time) except OSError as e: if e.errno != SFTP_NO_SUCH_FILE: - # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 - if self.soft_fail: - raise AirflowSkipException from e - raise e + # TODO: replace by AirflowPokeFailException when min_airflow_version is set to at least 2.10.0 + raise AirflowSensorTimeout from e continue if self.newer_than: diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index a20aeaabc083a..f9bb609ff0cbe 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -35,6 +35,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowPokeFailException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, @@ -128,11 +129,11 @@ class SkipPolicy(str, enum.Enum): SKIP_ON_ANY_ERROR = "skip_on_any_error" # If poke method raises AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException - # sensor will be skipped on. + # or AirflowPokeFailException sensor will be skipped on. SKIP_ON_SOFT_ERROR = "skip_on_soft_error" # If poke method raises an exception different from AirflowSensorTimeout, AirflowTaskTimeout, - # AirflowSkipException, sensor will ignore exception and re-poke until timeout. + # AirflowSkipException or AirflowPokeFailException sensor will ignore exception and re-poke until timeout. IGNORE_ERROR = "ignore_error" @@ -343,6 +344,7 @@ def run_duration() -> float: AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, + AirflowPokeFailException, ) as e: if self.skip_policy == SkipPolicy.SKIP_ON_SOFT_ERROR: raise AirflowSkipException("Skipping due skip_policy set to skip_on_soft_error.") from e diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 339ad790564e0..cc84c84ef5661 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -25,7 +25,7 @@ import attr from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowSkipException, RemovedInAirflow3Warning +from airflow.exceptions import AirflowPokeFailException, AirflowSkipException, RemovedInAirflow3Warning from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag @@ -177,7 +177,7 @@ def __init__( total_states = set(self.allowed_states + self.skipped_states + self.failed_states) if len(total_states) != len(self.allowed_states) + len(self.skipped_states) + len(self.failed_states): - raise AirflowException( + raise ValueError( "Duplicate values provided across allowed_states, skipped_states and failed_states." ) @@ -288,32 +288,18 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool: # Fail if anything in the list has failed. if count_failed > 0: if self.external_task_ids: - if self.soft_fail: - raise AirflowSkipException( - f"Some of the external tasks {self.external_task_ids} " - f"in DAG {self.external_dag_id} failed. Skipping due to soft_fail." - ) - raise AirflowException( + raise AirflowPokeFailException( f"Some of the external tasks {self.external_task_ids} " f"in DAG {self.external_dag_id} failed." ) elif self.external_task_group_id: - if self.soft_fail: - raise AirflowSkipException( - f"The external task_group '{self.external_task_group_id}' " - f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail." - ) - raise AirflowException( + raise AirflowPokeFailException( f"The external task_group '{self.external_task_group_id}' " f"in DAG '{self.external_dag_id}' failed." ) else: - if self.soft_fail: - raise AirflowSkipException( - f"The external DAG {self.external_dag_id} failed. Skipping due to soft_fail." - ) - raise AirflowException(f"The external DAG {self.external_dag_id} failed.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} failed.") count_skipped = -1 if self.skipped_states: @@ -366,30 +352,27 @@ def execute_complete(self, context, event=None): if event["status"] == "success": self.log.info("External tasks %s has executed successfully.", self.external_task_ids) elif event["status"] == "skipped": - raise AirflowSkipException("External job has skipped skipping.") + raise AirflowPokeFailException("External job has skipped skipping.") else: - if self.soft_fail: - raise AirflowSkipException("External job has failed skipping.") - else: - raise AirflowException( - "Error occurred while trying to retrieve task status. Please, check the " - "name of executed task and Dag." - ) + raise AirflowPokeFailException( + "Error occurred while trying to retrieve task status. Please, check the " + "name of executed task and Dag." + ) def _check_for_existence(self, session) -> None: dag_to_wait = DagModel.get_current(self.external_dag_id, session) if not dag_to_wait: - raise AirflowException(f"The external DAG {self.external_dag_id} does not exist.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} does not exist.") if not os.path.exists(correct_maybe_zipped(dag_to_wait.fileloc)): - raise AirflowException(f"The external DAG {self.external_dag_id} was deleted.") + raise AirflowPokeFailException(f"The external DAG {self.external_dag_id} was deleted.") if self.external_task_ids: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) for external_task_id in self.external_task_ids: if not refreshed_dag_info.has_task(external_task_id): - raise AirflowException( + raise AirflowPokeFailException( f"The external task {external_task_id} in " f"DAG {self.external_dag_id} does not exist." ) @@ -397,7 +380,7 @@ def _check_for_existence(self, session) -> None: if self.external_task_group_id: refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id) if not refreshed_dag_info.has_task_group(self.external_task_group_id): - raise AirflowException( + raise AirflowPokeFailException( f"The external task group '{self.external_task_group_id}' in " f"DAG '{self.external_dag_id}' does not exist." ) diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index 2c803d01f5cb0..a44fed5a2b851 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Sequence from airflow.configuration import conf -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowPokeFailException from airflow.hooks.filesystem import FSHook from airflow.sensors.base import BaseSensorOperator from airflow.triggers.file import FileTrigger @@ -110,5 +110,5 @@ def execute(self, context: Context) -> None: def execute_complete(self, context: Context, event: bool | None = None) -> None: if not event: - raise AirflowException("%s task failed as %s not found.", self.task_id, self.filepath) + raise AirflowPokeFailException("%s task failed as %s not found.", self.task_id, self.filepath) self.log.info("%s completed successfully as %s found.", self.task_id, self.filepath) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index d068fad9bf5a5..5d8ba9329c25f 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -19,7 +19,6 @@ from typing import TYPE_CHECKING, Any, NoReturn -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -77,12 +76,8 @@ def execute(self, context: Context) -> bool | NoReturn: if timezone.utcnow() > target_dttm: # If the target datetime is in the past, return immediately return True - try: - trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) - except (TypeError, ValueError) as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise + + trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) self.defer(trigger=trigger, method_name="execute_complete") diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index 228a1ca0dc5ed..af2b6e0b5bbb3 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -431,7 +431,7 @@ def test_validate_provider_info_with_schema(): @pytest.mark.parametrize( "provider_id, min_version", [ - ("amazon", "2.7.0"), + ("amazon", "2.7.1"), ("common.io", "2.8.0"), ], ) @@ -496,7 +496,7 @@ def test_provider_jinja_context(): "CHANGELOG_RELATIVE_PATH": "../../airflow/providers/amazon", "SUPPORTED_PYTHON_VERSIONS": ["3.8", "3.9", "3.10", "3.11", "3.12"], "PLUGINS": [], - "MIN_AIRFLOW_VERSION": "2.7.0", + "MIN_AIRFLOW_VERSION": "2.7.1", "PROVIDER_REMOVED": False, "PROVIDER_INFO": provider_info, } diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 85dbd405e8f87..bbaa3008023d5 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -29,10 +29,9 @@ "deps": [ "PyAthena>=3.0.10", "apache-airflow-providers-common-compat>=1.1.0", - "apache-airflow-providers-common-compat>=1.1.0", "apache-airflow-providers-common-sql>=1.3.1", "apache-airflow-providers-http", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asgiref>=2.3.0", "boto3>=1.34.90", "botocore>=1.34.90", @@ -575,7 +574,7 @@ }, "ftp": { "deps": [ - "apache-airflow>=2.7.0" + "apache-airflow>=2.7.1" ], "devel-deps": [], "plugins": [], @@ -719,7 +718,7 @@ "http": { "deps": [ "aiohttp>=3.9.2", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asgiref", "requests>=2.27.0,<3", "requests_toolbelt" @@ -1149,7 +1148,7 @@ "sftp": { "deps": [ "apache-airflow-providers-ssh>=2.1.0", - "apache-airflow>=2.7.0", + "apache-airflow>=2.7.1", "asyncssh>=2.12.0", "paramiko>=2.9.0" ], diff --git a/tests/providers/amazon/aws/sensors/test_s3.py b/tests/providers/amazon/aws/sensors/test_s3.py index fd70f7134a7e3..170be0d7517f1 100644 --- a/tests/providers/amazon/aws/sensors/test_s3.py +++ b/tests/providers/amazon/aws/sensors/test_s3.py @@ -24,7 +24,7 @@ import time_machine from moto import mock_aws -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance from airflow.models.variable import Variable from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -274,7 +274,7 @@ def check_fn(files: list) -> bool: ) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_execute_complete(self, soft_fail, expected_exception): op = S3KeySensor( @@ -516,7 +516,7 @@ def test_poke_succeeds_on_upload_complete(self, mock_hook, time_machine): assert self.sensor.poke(dict()) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_is_keys_unchanged(self, soft_fail, expected_exception): op = S3KeysUnchangedSensor(task_id="sensor", bucket_name="test-bucket", prefix="test-prefix/path") @@ -529,7 +529,7 @@ def test_fail_is_keys_unchanged(self, soft_fail, expected_exception): op.is_keys_unchanged(current_objects=current_objects) @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowException)) ) def test_fail_execute_complete(self, soft_fail, expected_exception): op = S3KeysUnchangedSensor(task_id="sensor", bucket_name="test-bucket", prefix="test-prefix/path") diff --git a/tests/providers/ftp/sensors/test_ftp.py b/tests/providers/ftp/sensors/test_ftp.py index 107b2323717bc..797dbb6f71ca3 100644 --- a/tests/providers/ftp/sensors/test_ftp.py +++ b/tests/providers/ftp/sensors/test_ftp.py @@ -22,7 +22,7 @@ import pytest -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException from airflow.providers.ftp.hooks.ftp import FTPHook from airflow.providers.ftp.sensors.ftp import FTPSensor @@ -52,10 +52,10 @@ def test_poke_fails_due_error(self, mock_hook): "530: Login authentication failed" ) - with pytest.raises(error_perm) as ctx: + with pytest.raises(AirflowSensorTimeout) as ctx: op.execute(None) - assert "530" in str(ctx.value) + assert "530" in str(ctx.value.__cause__) @mock.patch("airflow.providers.ftp.sensors.ftp.FTPHook", spec=FTPHook) def test_poke_fail_on_transient_error(self, mock_hook): @@ -65,10 +65,10 @@ def test_poke_fail_on_transient_error(self, mock_hook): "434: Host unavailable" ) - with pytest.raises(error_perm) as ctx: + with pytest.raises(AirflowSensorTimeout) as ctx: op.execute(None) - assert "434" in str(ctx.value) + assert "434" in str(ctx.value.__cause__) @mock.patch("airflow.providers.ftp.sensors.ftp.FTPHook", spec=FTPHook) def test_poke_fail_on_transient_error_and_skip(self, mock_hook): diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py index 4e95c844058fa..ecb209605f71e 100644 --- a/tests/providers/http/sensors/test_http.py +++ b/tests/providers/http/sensors/test_http.py @@ -23,7 +23,12 @@ import pytest import requests -from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowSensorTimeout, + AirflowSkipException, + TaskDeferred, +) from airflow.models.dag import DAG from airflow.providers.http.operators.http import HttpOperator from airflow.providers.http.sensors.http import HttpSensor @@ -75,7 +80,7 @@ def test_poke_exception_with_soft_fail(self, mock_session_send, create_task_of_o mock_session_send.return_value = response def resp_check(_): - raise AirflowException("AirflowException raised here!") + raise AirflowSensorTimeout("AirflowSensorTimeout raised here!") task = create_task_of_operator( HttpSensor, diff --git a/tests/providers/sftp/sensors/test_sftp.py b/tests/providers/sftp/sensors/test_sftp.py index 25add45e153fb..b556412cfd52f 100644 --- a/tests/providers/sftp/sensors/test_sftp.py +++ b/tests/providers/sftp/sensors/test_sftp.py @@ -25,7 +25,7 @@ from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE from pendulum import datetime as pendulum_datetime, timezone -from airflow.exceptions import AirflowSkipException +from airflow.exceptions import AirflowSensorTimeout from airflow.providers.sftp.sensors.sftp import SFTPSensor from airflow.sensors.base import PokeReturnValue @@ -53,7 +53,7 @@ def test_file_absent(self, sftp_hook_mock): assert not output @pytest.mark.parametrize( - "soft_fail, expected_exception", ((False, OSError), (True, AirflowSkipException)) + "soft_fail, expected_exception", ((False, AirflowSensorTimeout), (True, AirflowSensorTimeout)) ) @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") def test_sftp_failure(self, sftp_hook_mock, soft_fail: bool, expected_exception): diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 7660effc7cb68..78e2f40f04ce3 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -27,6 +27,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowPokeFailException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, @@ -214,6 +215,7 @@ def test_skip_on_soft_error_with_exception(self, make_sensor, exception_cls): AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, + AirflowPokeFailException, ), ) def test_skip_on_soft_error_with_skip_exception(self, make_sensor, exception_cls): @@ -231,7 +233,7 @@ def test_skip_on_soft_error_with_skip_exception(self, make_sensor, exception_cls @pytest.mark.parametrize( "exception_cls", - (AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, Exception), + (AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, AirflowPokeFailException, Exception), ) def test_skip_on_any_error_with_skip_exception(self, make_sensor, exception_cls): sensor, dr = make_sensor(False, skip_policy=SkipPolicy.SKIP_ON_ANY_ERROR) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index d933a4c2266ef..97870b40a6e8a 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -30,7 +30,13 @@ from airflow import exceptions, settings from airflow.decorators import task as task_deco -from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException, TaskDeferred +from airflow.exceptions import ( + AirflowException, + AirflowPokeFailException, + AirflowSensorTimeout, + AirflowSkipException, + TaskDeferred, +) from airflow.models import DagBag, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel @@ -247,7 +253,7 @@ def test_external_task_group_not_exists_without_check_existence(self): dag=self.dag, poke_interval=0.1, ) - with pytest.raises(AirflowException, match="Sensor has timed out"): + with pytest.raises(AirflowSensorTimeout, match="Sensor has timed out"): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_external_task_group_sensor_success(self): @@ -274,13 +280,13 @@ def test_external_task_group_sensor_failed_states(self): dag=self.dag, ) with pytest.raises( - AirflowException, + AirflowPokeFailException, match=f"The external task_group '{TEST_TASK_GROUP_ID}' in DAG '{TEST_DAG_ID}' failed.", ): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_catch_overlap_allowed_failed_state(self): - with pytest.raises(AirflowException): + with pytest.raises(ValueError): ExternalTaskSensor( task_id="test_external_task_sensor_check", external_dag_id=TEST_DAG_ID, @@ -324,7 +330,7 @@ def test_external_task_sensor_failed_states_as_success(self, caplog): error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in DAG {TEST_DAG_ID} failed\." with caplog.at_level(logging.INFO, logger=op.log.name): caplog.clear() - with pytest.raises(AirflowException, match=error_message): + with pytest.raises(AirflowPokeFailException, match=error_message): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " @@ -427,7 +433,7 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c ) with caplog.at_level(logging.INFO, logger=op.log.name): caplog.clear() - with pytest.raises(AirflowException, match=error_message): + with pytest.raises(AirflowPokeFailException, match=error_message): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] " @@ -590,12 +596,12 @@ def test_external_task_sensor_fn_multiple_execution_dates(self): dag=dag, ) - # We need to test for an AirflowException explicitly since + # We need to test for an AirflowPokeFailException explicitly since # AirflowSensorTimeout is a subclass that will be raised if this does # not execute properly. - with pytest.raises(AirflowException) as ex_ctx: + with pytest.raises(AirflowPokeFailException) as ex_ctx: task_chain_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert type(ex_ctx.value) is AirflowException + assert type(ex_ctx.value) is AirflowPokeFailException def test_external_task_sensor_delta(self): self.add_time_sensor() @@ -830,7 +836,7 @@ def test_external_task_group_with_mapped_tasks_failed_states(self): dag=self.dag, ) with pytest.raises( - AirflowException, + AirflowPokeFailException, match=f"The external task_group '{TEST_TASK_GROUP_ID}' in DAG '{TEST_DAG_ID}' failed.", ): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @@ -885,7 +891,7 @@ def test_external_task_group_when_there_is_no_TIs(self): ( ( SkipPolicy.NONE, - AirflowException, + AirflowPokeFailException, ), ( SkipPolicy.SKIP_ON_SOFT_ERROR, @@ -909,6 +915,8 @@ def test_fail_poke( deferrable=False, **kwargs, ) + if skip_policy == SkipPolicy.SKIP_ON_SOFT_ERROR: + expected_message = "Skipping due skip_policy set to skip_on_soft_error." with pytest.raises(expected_exception, match=expected_message): op.execute(context={}) @@ -942,11 +950,11 @@ def test_fail_poke( ( ( SkipPolicy.NONE, - AirflowException, + AirflowPokeFailException, ), ( SkipPolicy.SKIP_ON_SOFT_ERROR, - AirflowException, + AirflowSkipException, ), ), ) @@ -983,6 +991,8 @@ def test_fail__check_for_existence( check_existence=True, **kwargs, ) + if skip_policy == SkipPolicy.SKIP_ON_SOFT_ERROR: + expected_message = "Skipping due skip_policy set to skip_on_soft_error." with pytest.raises(expected_exception, match=expected_message): op.execute(context={}) @@ -1011,7 +1021,7 @@ def test_defer_and_fire_task_state_trigger(self): assert isinstance(exc.value.trigger, WorkflowTrigger), "Trigger is not a WorkflowTrigger" def test_defer_and_fire_failed_state_trigger(self): - """Tests that an AirflowException is raised in case of error event""" + """Tests that an AirflowPokeFailException is raised in case of error event""" sensor = ExternalTaskSensor( task_id=TASK_ID, external_task_id=EXTERNAL_TASK_ID, @@ -1019,13 +1029,13 @@ def test_defer_and_fire_failed_state_trigger(self): deferrable=True, ) - with pytest.raises(AirflowException): + with pytest.raises(AirflowPokeFailException): sensor.execute_complete( context=mock.MagicMock(), event={"status": "error", "message": "test failure message"} ) def test_defer_and_fire_timeout_state_trigger(self): - """Tests that an AirflowException is raised in case of timeout event""" + """Tests that an AirflowPokeFailException is raised in case of timeout event""" sensor = ExternalTaskSensor( task_id=TASK_ID, external_task_id=EXTERNAL_TASK_ID, @@ -1033,7 +1043,7 @@ def test_defer_and_fire_timeout_state_trigger(self): deferrable=True, ) - with pytest.raises(AirflowException): + with pytest.raises(AirflowPokeFailException): sensor.execute_complete( context=mock.MagicMock(), event={"status": "timeout", "message": "Dag was not started within 1 minute, assuming fail."},