diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index b34b33295ea62..c2922a1ca4c9c 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -67,9 +67,8 @@ When writing a deferrable operators these are the main points to consider: from typing import Any from airflow.configuration import conf - from airflow.sdk import BaseSensorOperator + from airflow.sdk import BaseSensorOperator, Context from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger - from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): @@ -175,14 +174,11 @@ Here's a basic example of how a sensor might trigger deferral: from __future__ import annotations from datetime import timedelta - from typing import TYPE_CHECKING, Any + from typing import Any - from airflow.sdk import BaseSensorOperator + from airflow.sdk import BaseSensorOperator, Context from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger - if TYPE_CHECKING: - from airflow.utils.context import Context - class WaitOneHourSensor(BaseSensorOperator): def execute(self, context: Context) -> None: @@ -288,13 +284,9 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as `` from __future__ import annotations from datetime import timedelta - from typing import TYPE_CHECKING, Any - - from airflow.sdk import BaseSensorOperator - from airflow.triggers.base import StartTriggerArgs + from typing import Any - if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs class WaitOneHourSensor(BaseSensorOperator): @@ -319,13 +311,9 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as `` from __future__ import annotations from datetime import timedelta - from typing import TYPE_CHECKING, Any - - from airflow.sdk import BaseSensorOperator - from airflow.triggers.base import StartTriggerArgs + from typing import Any - if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs class WaitHoursSensor(BaseSensorOperator): @@ -358,13 +346,9 @@ After the trigger has finished executing, the task may be sent back to the worke from __future__ import annotations from datetime import timedelta - from typing import TYPE_CHECKING, Any - - from airflow.sdk import BaseSensorOperator - from airflow.triggers.base import StartTriggerArgs + from typing import Any - if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs class WaitHoursSensor(BaseSensorOperator): diff --git a/airflow-core/src/airflow/models/mappedoperator.py b/airflow-core/src/airflow/models/mappedoperator.py index 4b187153ffccc..b7df995291eb7 100644 --- a/airflow-core/src/airflow/models/mappedoperator.py +++ b/airflow-core/src/airflow/models/mappedoperator.py @@ -59,13 +59,11 @@ from airflow.models import TaskInstance from airflow.models.dag import DAG as SchedulerDAG from airflow.models.expandinput import SchedulerExpandInput - from airflow.sdk import BaseOperatorLink - from airflow.sdk.definitions.context import Context + from airflow.sdk import BaseOperatorLink, Context, StartTriggerArgs + from airflow.sdk.definitions.operator_resources import Resources from airflow.sdk.definitions.param import ParamsDict + from airflow.task.trigger_rule import TriggerRule from airflow.ti_deps.deps.base_ti_dep import BaseTIDep - from airflow.triggers.base import StartTriggerArgs - from airflow.utils.operator_resources import Resources - from airflow.utils.trigger_rule import TriggerRule Operator: TypeAlias = "SerializedBaseOperator | MappedOperator" diff --git a/providers/standard/src/airflow/providers/standard/sensors/date_time.py b/providers/standard/src/airflow/providers/standard/sensors/date_time.py index 17f8aa47f4dfe..fc6ff69c88ace 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/date_time.py @@ -24,29 +24,32 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator -from airflow.utils import timezone try: - from airflow.triggers.base import StartTriggerArgs -except ImportError: - # TODO: Remove this when min airflow version is 2.10.0 for standard provider - @dataclass - class StartTriggerArgs: # type: ignore[no-redef] - """Arguments required for start task execution from triggerer.""" + from airflow.sdk import timezone +except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider + from airflow.utils import timezone # type: ignore[attr-defined,no-redef] - trigger_cls: str - next_method: str - trigger_kwargs: dict[str, Any] | None = None - next_kwargs: dict[str, Any] | None = None - timeout: datetime.timedelta | None = None +try: + from airflow.sdk import StartTriggerArgs +except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider + try: + from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef] + except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider + + @dataclass + class StartTriggerArgs: # type: ignore[no-redef] + """Arguments required for start task execution from triggerer.""" + + trigger_cls: str + next_method: str + trigger_kwargs: dict[str, Any] | None = None + next_kwargs: dict[str, Any] | None = None + timeout: datetime.timedelta | None = None if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.sdk import Context class DateTimeSensor(BaseSensorOperator): diff --git a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py index 32681cb0f2716..86e9e3133b029 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/filesystem.py +++ b/providers/standard/src/airflow/providers/standard/sensors/filesystem.py @@ -32,26 +32,25 @@ from airflow.providers.standard.version_compat import BaseSensorOperator try: - from airflow.triggers.base import StartTriggerArgs -except ImportError: - # TODO: Remove this when min airflow version is 2.10.0 for standard provider - @dataclass - class StartTriggerArgs: # type: ignore[no-redef] - """Arguments required for start task execution from triggerer.""" + from airflow.sdk import StartTriggerArgs +except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider + try: + from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef] + except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider + + @dataclass + class StartTriggerArgs: # type: ignore[no-redef] + """Arguments required for start task execution from triggerer.""" - trigger_cls: str - next_method: str - trigger_kwargs: dict[str, Any] | None = None - next_kwargs: dict[str, Any] | None = None - timeout: datetime.timedelta | None = None + trigger_cls: str + next_method: str + trigger_kwargs: dict[str, Any] | None = None + next_kwargs: dict[str, Any] | None = None + timeout: datetime.timedelta | None = None if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.sdk import Context class FileSensor(BaseSensorOperator): diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py b/providers/standard/src/airflow/providers/standard/sensors/time.py index 6973dc7b4cc66..988b4b85843f4 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/time.py +++ b/providers/standard/src/airflow/providers/standard/sensors/time.py @@ -28,18 +28,21 @@ from airflow.providers.standard.version_compat import BaseSensorOperator try: - from airflow.triggers.base import StartTriggerArgs -except ImportError: - # TODO: Remove this when min airflow version is 2.10.0 for standard provider - @dataclass - class StartTriggerArgs: # type: ignore[no-redef] - """Arguments required for start task execution from triggerer.""" + from airflow.sdk import StartTriggerArgs +except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider + try: + from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef] + except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider - trigger_cls: str - next_method: str - trigger_kwargs: dict[str, Any] | None = None - next_kwargs: dict[str, Any] | None = None - timeout: datetime.timedelta | None = None + @dataclass + class StartTriggerArgs: # type: ignore[no-redef] + """Arguments required for start task execution from triggerer.""" + + trigger_cls: str + next_method: str + trigger_kwargs: dict[str, Any] | None = None + next_kwargs: dict[str, Any] | None = None + timeout: datetime.timedelta | None = None try: @@ -48,11 +51,7 @@ class StartTriggerArgs: # type: ignore[no-redef] from airflow.utils import timezone # type: ignore[attr-defined,no-redef] if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.sdk import Context class TimeSensor(BaseSensorOperator):