Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions airflow-core/docs/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ When writing a deferrable operators these are the main points to consider:
from typing import Any

from airflow.configuration import conf
from airflow.sdk import BaseSensorOperator
from airflow.sdk import BaseSensorOperator, Context
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
Expand Down Expand Up @@ -175,14 +174,11 @@ Here's a basic example of how a sensor might trigger deferral:
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any
from typing import Any

from airflow.sdk import BaseSensorOperator
from airflow.sdk import BaseSensorOperator, Context
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
Expand Down Expand Up @@ -288,13 +284,9 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from typing import Any

if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs


class WaitOneHourSensor(BaseSensorOperator):
Expand All @@ -319,13 +311,9 @@ In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as ``
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from typing import Any

if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs


class WaitHoursSensor(BaseSensorOperator):
Expand Down Expand Up @@ -358,13 +346,9 @@ After the trigger has finished executing, the task may be sent back to the worke
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from typing import Any

if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.sdk import BaseSensorOperator, Context, StartTriggerArgs


class WaitHoursSensor(BaseSensorOperator):
Expand Down
8 changes: 3 additions & 5 deletions airflow-core/src/airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@
from airflow.models import TaskInstance
from airflow.models.dag import DAG as SchedulerDAG
from airflow.models.expandinput import SchedulerExpandInput
from airflow.sdk import BaseOperatorLink
from airflow.sdk.definitions.context import Context
from airflow.sdk import BaseOperatorLink, Context, StartTriggerArgs
from airflow.sdk.definitions.operator_resources import Resources
from airflow.sdk.definitions.param import ParamsDict
from airflow.task.trigger_rule import TriggerRule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.operator_resources import Resources
from airflow.utils.trigger_rule import TriggerRule

Operator: TypeAlias = "SerializedBaseOperator | MappedOperator"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,32 @@

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator
from airflow.utils import timezone

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""
from airflow.sdk import timezone
except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None
try:
from airflow.sdk import StartTriggerArgs
except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider
try:
from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef]
except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider

@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.sdk import Context


class DateTimeSensor(BaseSensorOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,25 @@
from airflow.providers.standard.version_compat import BaseSensorOperator

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""
from airflow.sdk import StartTriggerArgs
except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider
try:
from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef]
except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider

@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None
trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.sdk import Context


class FileSensor(BaseSensorOperator):
Expand Down
31 changes: 15 additions & 16 deletions providers/standard/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
from airflow.providers.standard.version_compat import BaseSensorOperator

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""
from airflow.sdk import StartTriggerArgs
except ImportError: # TODO: Remove this when min airflow version is 3.1.0 for standard provider
try:
from airflow.triggers.base import StartTriggerArgs # type: ignore[no-redef]
except ImportError: # TODO: Remove this when min airflow version is 2.10.0 for standard provider

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


try:
Expand All @@ -48,11 +51,7 @@ class StartTriggerArgs: # type: ignore[no-redef]
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.sdk import Context


class TimeSensor(BaseSensorOperator):
Expand Down
Loading