Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/sensors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@

from airflow.utils.deprecation_tools import add_deprecated_classes

# TODO: Add definition from Task SDK here and remove `base.py` file
__deprecated_classes = {
"base": {
"BaseSensorOperator": "airflow.sdk.bases.sensor.BaseSensorOperator",
"PokeReturnValue": "airflow.sdk.bases.sensor.PokeReturnValue",
"poke_mode_only": "airflow.sdk.bases.sensor.poke_mode_only",
},
"python":{
"PythonSensor": "airflow.providers.standard.sensors.python.PythonSensor",
},
Expand Down
24 changes: 0 additions & 24 deletions airflow-core/src/airflow/sensors/base.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
from airflow.providers.openlineage.extractors import OperatorLineage
Expand Down
6 changes: 5 additions & 1 deletion providers/http/tests/unit/http/sensors/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,11 @@ def test_execute_is_deferred(self, mock_poke):
assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is not a HttpTrigger"

@mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
@mock.patch("airflow.sensors.base.BaseSensorOperator.execute")
@mock.patch(
"airflow.sdk.bases.sensor.BaseSensorOperator.execute"
if AIRFLOW_V_3_0_PLUS
else "airflow.sensors.base.BaseSensorOperator.execute"
)
def test_execute_not_defer_when_response_check_is_not_none(self, mock_execute, mock_defer):
task = HttpSensor(
task_id="run_now",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
AzureDataFactoryPipelineRunStatus,
)
from airflow.providers.microsoft.azure.triggers.data_factory import ADFPipelineRunStatusSensorTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowFailException
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator # type:ignore[no-redef]
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
from airflow.exceptions import AirflowException
from airflow.providers.standard.hooks.filesystem import FSHook
from airflow.providers.standard.triggers.file import FileTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

try:
from airflow.triggers.base import StartTriggerArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Callable

from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.context import context_merge
from airflow.utils.operator_helpers import determine_kwargs

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue
else:
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

try:
from airflow.triggers.base import StartTriggerArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import warnings
from datetime import datetime, timedelta
from time import sleep
from typing import TYPE_CHECKING, Any, NoReturn
from typing import TYPE_CHECKING, Any

from deprecated.classic import deprecated
from packaging.version import Version
Expand All @@ -29,9 +29,13 @@
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down Expand Up @@ -106,7 +110,7 @@ def poke(self, context: Context) -> bool:
Asynchronous execution
"""

def execute(self, context: Context) -> bool | NoReturn:
def execute(self, context: Context) -> Any:
"""
Depending on the deferrable flag, either execute the sensor in a blocking way or defer it.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
from typing import TYPE_CHECKING

from airflow.providers.standard.utils.weekday import WeekDay
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down