diff --git a/providers/standard/src/airflow/providers/standard/operators/bash.py b/providers/standard/src/airflow/providers/standard/operators/bash.py index caee52072e10e..e10192a05b228 100644 --- a/providers/standard/src/airflow/providers/standard/operators/bash.py +++ b/providers/standard/src/airflow/providers/standard/operators/bash.py @@ -26,12 +26,7 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.standard.hooks.subprocess import SubprocessHook, SubprocessResult, working_directory -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS - -if AIRFLOW_V_3_1_PLUS: - from airflow.sdk import BaseOperator -else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator if AIRFLOW_V_3_0_PLUS: from airflow.sdk.execution_time.context import context_to_airflow_vars diff --git a/providers/standard/src/airflow/providers/standard/operators/branch.py b/providers/standard/src/airflow/providers/standard/operators/branch.py index 064bb52afe188..98c059ac38eac 100644 --- a/providers/standard/src/airflow/providers/standard/operators/branch.py +++ b/providers/standard/src/airflow/providers/standard/operators/branch.py @@ -22,12 +22,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS - -if AIRFLOW_V_3_1_PLUS: - from airflow.sdk import BaseOperator -else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator if AIRFLOW_V_3_0_PLUS: from airflow.providers.standard.utils.skipmixin import SkipMixin diff --git a/providers/standard/src/airflow/providers/standard/operators/empty.py b/providers/standard/src/airflow/providers/standard/operators/empty.py index cdfebf3a4139c..b9a480bc86ea9 100644 --- a/providers/standard/src/airflow/providers/standard/operators/empty.py +++ b/providers/standard/src/airflow/providers/standard/operators/empty.py @@ -18,12 +18,7 @@ from typing import TYPE_CHECKING -from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS - -if AIRFLOW_V_3_1_PLUS: - from airflow.sdk import BaseOperator -else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import BaseOperator if TYPE_CHECKING: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 92387d28cbffd..f6efcf3bf0d26 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -50,18 +50,13 @@ ) from airflow.models.variable import Variable from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator from airflow.utils import hashlib_wrapper from airflow.utils.context import context_copy_partial, context_merge from airflow.utils.file import get_unique_dag_module_name from airflow.utils.operator_helpers import KeywordParameters from airflow.utils.process_utils import execute_in_subprocess -if AIRFLOW_V_3_1_PLUS: - from airflow.sdk import BaseOperator -else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] - if AIRFLOW_V_3_0_PLUS: from airflow.providers.standard.operators.branch import BaseBranchOperator from airflow.providers.standard.utils.skipmixin import SkipMixin diff --git a/providers/standard/src/airflow/providers/standard/operators/smooth.py b/providers/standard/src/airflow/providers/standard/operators/smooth.py index 6799e07a5ef05..9b3d1ff181e26 100644 --- a/providers/standard/src/airflow/providers/standard/operators/smooth.py +++ b/providers/standard/src/airflow/providers/standard/operators/smooth.py @@ -19,12 +19,7 @@ from typing import TYPE_CHECKING -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperator -else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import BaseOperator if TYPE_CHECKING: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 4f474c3e6996e..998fd4c8d7e55 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -34,12 +34,11 @@ DagNotFound, DagRunAlreadyExists, ) -from airflow.models import BaseOperator from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.providers.standard.triggers.external_task import DagStateTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator, BaseOperatorLink from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import NOTSET, ArgNotSet, DagRunType @@ -60,11 +59,9 @@ from airflow.utils.context import Context if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperatorLink from airflow.sdk.execution_time.xcom import XCom else: from airflow.models import XCom # type: ignore[no-redef] - from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class DagIsPaused(AirflowException): diff --git a/providers/standard/src/airflow/providers/standard/sensors/bash.py b/providers/standard/src/airflow/providers/standard/sensors/bash.py index 5e8dd0b24650f..9b83b4af187a3 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/bash.py +++ b/providers/standard/src/airflow/providers/standard/sensors/bash.py @@ -24,12 +24,7 @@ from typing import TYPE_CHECKING from airflow.exceptions import AirflowFailException -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import BaseSensorOperator if TYPE_CHECKING: try: diff --git a/providers/standard/src/airflow/providers/standard/sensors/date_time.py b/providers/standard/src/airflow/providers/standard/sensors/date_time.py index a2e2478109f59..85758210a05f6 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/date_time.py @@ -23,14 +23,9 @@ from typing import TYPE_CHECKING, Any, NoReturn from airflow.providers.standard.triggers.temporal import DateTimeTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator from airflow.utils import timezone -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] - try: from airflow.triggers.base import StartTriggerArgs except ImportError: diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 974608bf714f4..32ad0111295bd 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -39,14 +39,16 @@ from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import ( + AIRFLOW_V_3_0_PLUS, + BaseOperator, + BaseOperatorLink, + BaseSensorOperator, +) from airflow.utils.file import correct_maybe_zipped from airflow.utils.state import State, TaskInstanceState -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type:ignore[no-redef] +if not AIRFLOW_V_3_0_PLUS: from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: @@ -55,19 +57,11 @@ from airflow.models.taskinstancekey import TaskInstanceKey if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperator from airflow.sdk.definitions.context import Context else: - from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] from airflow.utils.context import Context # type: ignore[no-redef] -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperatorLink -else: - from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] - - class ExternalDagLink(BaseOperatorLink): """ Operator link for ExternalTaskSensor and ExternalTaskMarker. diff --git a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py index 10315dc88a97e..32681cb0f2716 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py +++ b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py @@ -29,12 +29,7 @@ from airflow.exceptions import AirflowException from airflow.providers.standard.hooks.filesystem import FSHook from airflow.providers.standard.triggers.file import FileTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import BaseSensorOperator try: from airflow.triggers.base import StartTriggerArgs diff --git a/providers/standard/src/airflow/providers/standard/sensors/python.py b/providers/standard/src/airflow/providers/standard/sensors/python.py index 512ebab4449d3..d6fc454018cc2 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/python.py +++ b/providers/standard/src/airflow/providers/standard/sensors/python.py @@ -20,15 +20,10 @@ from collections.abc import Callable, Mapping, Sequence from typing import TYPE_CHECKING, Any -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import BaseSensorOperator, PokeReturnValue from airflow.utils.context import context_merge from airflow.utils.operator_helpers import determine_kwargs -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue -else: - from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef] - if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index ad02dc4dc82a9..6d72c29fb4a81 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -25,12 +25,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.standard.triggers.temporal import DateTimeTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] +from airflow.providers.standard.version_compat import BaseSensorOperator try: from airflow.triggers.base import StartTriggerArgs diff --git a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py index 5b38a2b078e85..492e9aafa1da6 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py @@ -28,14 +28,9 @@ from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator from airflow.utils import timezone -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] - if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/sensors/weekday.py b/providers/standard/src/airflow/providers/standard/sensors/weekday.py index 6f3af7a318241..ac23e1ac9cb6c 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/weekday.py +++ b/providers/standard/src/airflow/providers/standard/sensors/weekday.py @@ -21,14 +21,9 @@ from typing import TYPE_CHECKING from airflow.providers.standard.utils.weekday import WeekDay -from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.standard.version_compat import BaseSensorOperator from airflow.utils import timezone -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] - if TYPE_CHECKING: try: from airflow.sdk.definitions.context import Context diff --git a/providers/standard/src/airflow/providers/standard/version_compat.py b/providers/standard/src/airflow/providers/standard/version_compat.py index 42bdcc7da03df..e630ace142f2b 100644 --- a/providers/standard/src/airflow/providers/standard/version_compat.py +++ b/providers/standard/src/airflow/providers/standard/version_compat.py @@ -32,5 +32,29 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: return airflow_version.major, airflow_version.minor, airflow_version.micro -AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) -AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0) +AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0) +AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0) + +# BaseOperator is not imported from SDK from 3.0 (and only done from 3.1) due to a bug with +# DecoratedOperator -- where `DecoratedOperator._handle_output` needed `xcom_push` to exist on `BaseOperator` +# even though it wasn't used. +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef] + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink + from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef] + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "AIRFLOW_V_3_1_PLUS", + "BaseOperator", + "BaseOperatorLink", + "BaseSensorOperator", + "PokeReturnValue", +] diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 190973b477aaa..ad63b23096fb9 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -182,9 +182,9 @@ def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker): ) dag_maker.sync_dagbag_to_db() parse_and_sync_to_db(self.f_name) - dag_maker.create_dagrun() + dr = dag_maker.create_dagrun() with pytest.raises(ValueError, match="^conf parameter should be JSON Serializable$"): - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + dag_maker.run_ti(task.task_id, dr) def test_trigger_dagrun_with_no_failed_state(self, dag_maker): task = TriggerDagRunOperator(