Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.standard.hooks.subprocess import SubprocessHook, SubprocessResult, working_directory
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.execution_time.context import context_to_airflow_vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING

from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.utils.skipmixin import SkipMixin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

from typing import TYPE_CHECKING

from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,13 @@
)
from airflow.models.variable import Variable
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.operators.branch import BaseBranchOperator
from airflow.providers.standard.utils.skipmixin import SkipMixin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@

from typing import TYPE_CHECKING

from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@
DagNotFound,
DagRunAlreadyExists,
)
from airflow.models import BaseOperator
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator, BaseOperatorLink
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType
Expand All @@ -60,11 +59,9 @@
from airflow.utils.context import Context

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import XCom # type: ignore[no-redef]
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


class DagIsPaused(AirflowException):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowFailException
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import BaseSensorOperator

if TYPE_CHECKING:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@
from typing import TYPE_CHECKING, Any, NoReturn

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.external_task import WorkflowTrigger
from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import (
AIRFLOW_V_3_0_PLUS,
BaseOperator,
BaseOperatorLink,
BaseSensorOperator,
)
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.state import State, TaskInstanceState

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type:ignore[no-redef]
if not AIRFLOW_V_3_0_PLUS:
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand All @@ -55,19 +57,11 @@
from airflow.models.taskinstancekey import TaskInstanceKey

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator
from airflow.sdk.definitions.context import Context
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
from airflow.utils.context import Context # type: ignore[no-redef]


if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


class ExternalDagLink(BaseOperatorLink):
"""
Operator link for ExternalTaskSensor and ExternalTaskMarker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@
from airflow.exceptions import AirflowException
from airflow.providers.standard.hooks.filesystem import FSHook
from airflow.providers.standard.triggers.file import FileTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import BaseSensorOperator

try:
from airflow.triggers.base import StartTriggerArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@
from collections.abc import Callable, Mapping, Sequence
from typing import TYPE_CHECKING, Any

from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import BaseSensorOperator, PokeReturnValue
from airflow.utils.context import context_merge
from airflow.utils.operator_helpers import determine_kwargs

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue
else:
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]
from airflow.providers.standard.version_compat import BaseSensorOperator

try:
from airflow.triggers.base import StartTriggerArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,9 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseSensorOperator
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
from typing import TYPE_CHECKING

from airflow.providers.standard.utils.weekday import WeekDay
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.version_compat import BaseSensorOperator
from airflow.utils import timezone

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,29 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro


AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)

# BaseOperator is not imported from SDK from 3.0 (and only done from 3.1) due to a bug with
# DecoratedOperator -- where `DecoratedOperator._handle_output` needed `xcom_push` to exist on `BaseOperator`
# even though it wasn't used.
if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
from airflow.sdk.bases.sensor import BaseSensorOperator, PokeReturnValue
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue # type: ignore[no-redef]

__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"BaseOperator",
"BaseOperatorLink",
"BaseSensorOperator",
"PokeReturnValue",
]
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker):
)
dag_maker.sync_dagbag_to_db()
parse_and_sync_to_db(self.f_name)
dag_maker.create_dagrun()
dr = dag_maker.create_dagrun()
with pytest.raises(ValueError, match="^conf parameter should be JSON Serializable$"):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
dag_maker.run_ti(task.task_id, dr)

def test_trigger_dagrun_with_no_failed_state(self, dag_maker):
task = TriggerDagRunOperator(
Expand Down