diff --git a/airflow-core/src/airflow/sensors/__init__.py b/airflow-core/src/airflow/sensors/__init__.py index 62a4037df2f66..db378f4550324 100644 --- a/airflow-core/src/airflow/sensors/__init__.py +++ b/airflow-core/src/airflow/sensors/__init__.py @@ -26,8 +26,12 @@ from airflow.utils.deprecation_tools import add_deprecated_classes -# TODO: Add definition from Task SDK here and remove `base.py` file __deprecated_classes = { + "base": { + "BaseSensorOperator": "airflow.sdk.bases.sensor.BaseSensorOperator", + "PokeReturnValue": "airflow.sdk.bases.sensor.PokeReturnValue", + "poke_mode_only": "airflow.sdk.bases.sensor.poke_mode_only", + }, "python":{ "PythonSensor": "airflow.providers.standard.sensors.python.PythonSensor", }, diff --git a/airflow-core/src/airflow/sensors/base.py b/airflow-core/src/airflow/sensors/base.py deleted file mode 100644 index 71ae006f53437..0000000000000 --- a/airflow-core/src/airflow/sensors/base.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from airflow.sdk.bases.sensor import ( - BaseSensorOperator as BaseSensorOperator, - PokeReturnValue as PokeReturnValue, - poke_mode_only as poke_mode_only, -) diff --git a/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py b/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py index f417291f2c9e2..a10c5bdad4b3d 100644 --- a/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py @@ -30,9 +30,9 @@ from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseSensorOperator + from airflow.sdk.bases.sensor import BaseSensorOperator else: - from airflow.sensors.base import BaseSensorOperator + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py index b628b77c22ce5..0e6ebeb6292d9 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py @@ -25,7 +25,12 @@ from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: from airflow.providers.openlineage.extractors import OperatorLineage diff --git a/providers/http/tests/unit/http/sensors/test_http.py b/providers/http/tests/unit/http/sensors/test_http.py index 55c4f5ca5fb66..b2b74c1dc5bfa 100644 --- a/providers/http/tests/unit/http/sensors/test_http.py +++ b/providers/http/tests/unit/http/sensors/test_http.py @@ -387,7 +387,11 @@ def test_execute_is_deferred(self, mock_poke): assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is not a HttpTrigger" @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer") - @mock.patch("airflow.sensors.base.BaseSensorOperator.execute") + @mock.patch( + "airflow.sdk.bases.sensor.BaseSensorOperator.execute" + if AIRFLOW_V_3_0_PLUS + else "airflow.sensors.base.BaseSensorOperator.execute" + ) def test_execute_not_defer_when_response_check_is_not_none(self, mock_execute, mock_defer): task = HttpSensor( task_id="run_now", diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py index 30f55c01d27e1..441df722214b5 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py @@ -29,7 +29,12 @@ AzureDataFactoryPipelineRunStatus, ) from airflow.providers.microsoft.azure.triggers.data_factory import ADFPipelineRunStatusSensorTrigger -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/standard/src/airflow/providers/standard/sensors/bash.py b/providers/standard/src/airflow/providers/standard/sensors/bash.py index 64a3220e48502..5e8dd0b24650f 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/bash.py +++ b/providers/standard/src/airflow/providers/standard/sensors/bash.py @@ -24,7 +24,12 @@ from typing import TYPE_CHECKING from airflow.exceptions import AirflowFailException -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: try: diff --git a/providers/standard/src/airflow/providers/standard/sensors/date_time.py b/providers/standard/src/airflow/providers/standard/sensors/date_time.py index b1c5b5da2976d..a2e2478109f59 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/date_time.py @@ -24,9 +24,13 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.sensors.base import BaseSensorOperator from airflow.utils import timezone +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] + try: from airflow.triggers.base import StartTriggerArgs except ImportError: diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index cc27679773d4b..aa8a971a526b8 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -46,7 +46,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.bases.sensor import BaseSensorOperator else: - from airflow.sensors.base import BaseSensorOperator + from airflow.sensors.base import BaseSensorOperator # type:ignore[no-redef] from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: diff --git a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py index 5f6f9e5a0fc61..10315dc88a97e 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py +++ b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py @@ -29,7 +29,12 @@ from airflow.exceptions import AirflowException from airflow.providers.standard.hooks.filesystem import FSHook from airflow.providers.standard.triggers.file import FileTrigger -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] try: from airflow.triggers.base import StartTriggerArgs diff --git a/providers/standard/src/airflow/providers/standard/sensors/python.py b/providers/standard/src/airflow/providers/standard/sensors/python.py index 33d59b903ce83..37c7244ba2b6a 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/python.py +++ b/providers/standard/src/airflow/providers/standard/sensors/python.py @@ -20,10 +20,15 @@ from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Callable -from airflow.sensors.base import BaseSensorOperator, PokeReturnValue +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue +else: + from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef] + if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index a694ee3c5f924..ad02dc4dc82a9 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -25,7 +25,12 @@ from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.standard.triggers.temporal import DateTimeTrigger -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] try: from airflow.triggers.base import StartTriggerArgs diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index 40ad809343bb7..5b38a2b078e85 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -20,7 +20,7 @@ import warnings from datetime import datetime, timedelta from time import sleep -from typing import TYPE_CHECKING, Any, NoReturn +from typing import TYPE_CHECKING, Any from deprecated.classic import deprecated from packaging.version import Version @@ -29,9 +29,13 @@ from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.sensors.base import BaseSensorOperator from airflow.utils import timezone +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] + if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context @@ -106,7 +110,7 @@ def poke(self, context: Context) -> bool: Asynchronous execution """ - def execute(self, context: Context) -> bool | NoReturn: + def execute(self, context: Context) -> Any: """ Depending on the deferrable flag, either execute the sensor in a blocking way or defer it. diff --git a/providers/standard/src/airflow/providers/standard/sensors/weekday.py b/providers/standard/src/airflow/providers/standard/sensors/weekday.py index 5b2fce987095e..6f3af7a318241 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/weekday.py +++ b/providers/standard/src/airflow/providers/standard/sensors/weekday.py @@ -21,9 +21,14 @@ from typing import TYPE_CHECKING from airflow.providers.standard.utils.weekday import WeekDay -from airflow.sensors.base import BaseSensorOperator +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.bases.sensor import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] + if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context