Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
ListOfDictsExpandInput,
is_mappable,
)
from airflow.sdk.bases.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions._internal.contextmanager import DagContext, TaskGroupContext
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator, ensure_xcomarg_return_value
from airflow.sdk.definitions.xcom_arg import XComArg
from airflow.typing_compat import ParamSpec
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/decorators/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
if TYPE_CHECKING:
from typing_extensions import TypeAlias

from airflow.sdk.definitions.baseoperator import TaskPreExecuteHook
from airflow.sdk.bases.baseoperator import TaskPreExecuteHook
from airflow.sdk.definitions.context import Context

BoolConditionFunc: TypeAlias = Callable[[Context], bool]
Expand Down
11 changes: 7 additions & 4 deletions airflow-core/src/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"Base",
"BaseOperator",
"BaseOperatorLink",
"BaseXCom",
"Connection",
"DagBag",
"DagWarning",
Expand All @@ -44,7 +45,7 @@
"TaskReschedule",
"Trigger",
"Variable",
"XComModel",
"XCom",
"clear_task_instances",
]

Expand All @@ -65,6 +66,7 @@ def import_all_models():
import airflow.models.serialized_dag
import airflow.models.taskinstancehistory
import airflow.models.tasklog
import airflow.models.xcom


def __getattr__(name):
Expand All @@ -88,7 +90,8 @@ def __getattr__(name):
"ID_LEN": "airflow.models.base",
"Base": "airflow.models.base",
"BaseOperator": "airflow.models.baseoperator",
"BaseOperatorLink": "airflow.sdk.definitions.baseoperatorlink",
"BaseOperatorLink": "airflow.sdk.bases.operatorlink",
"BaseXCom": "airflow.sdk.bases.xcom",
"Connection": "airflow.models.connection",
"DagBag": "airflow.models.dagbag",
"DagModel": "airflow.models.dag",
Expand All @@ -114,7 +117,6 @@ def __getattr__(name):
if TYPE_CHECKING:
# I was unable to get mypy to respect a airflow/models/__init__.pyi, so
# having to resort back to this hacky method
from airflow.jobs.job import Job
from airflow.models.base import ID_LEN, Base
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
Expand All @@ -135,6 +137,7 @@ def __getattr__(name):
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.variable import Variable
from airflow.sdk import BaseOperatorLink
from airflow.sdk.bases.operatorlink import BaseOperatorLink
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.definitions.param import Param
from airflow.sdk.execution_time.xcom import XCom
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.triggers.base import StartTriggerArgs

Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
NotMapped,
)
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator as TaskSDKAbstractOperator
from airflow.sdk.definitions.baseoperator import (
from airflow.sdk.bases.baseoperator import (
BaseOperator as TaskSDKBaseOperator,
# Re-export for compat
chain as chain,
chain_linear as chain_linear,
cross_downstream as cross_downstream,
get_merged_defaults as get_merged_defaults,
)
from airflow.sdk.definitions.dag import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator as TaskSDKAbstractOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
from airflow.serialization.enums import DagAttributeTypes
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/baseoperatorlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

from __future__ import annotations

from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink as BaseOperatorLink
from airflow.sdk.bases.operatorlink import BaseOperatorLink as BaseOperatorLink
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ def _execute_task(task_instance: TaskInstance, context: Context, task_orig: Oper

:meta private:
"""
from airflow.sdk.definitions.baseoperator import ExecutorSafeguard
from airflow.sdk.bases.baseoperator import ExecutorSafeguard
from airflow.sdk.definitions.mappedoperator import MappedOperator

task_to_execute = task_instance.task
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def expand_mapped_task(cls, task, run_id: str, *, session: Session) -> tuple[Seq
from airflow.models.baseoperator import BaseOperator as DBBaseOperator
from airflow.models.expandinput import NotFullyPopulated
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.settings import task_instance_mutation_hook

Expand Down
14 changes: 9 additions & 5 deletions airflow-core/src/airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,16 @@ def _process_row(row: Row) -> Any:


def __getattr__(name: str):
if name == "BaseXCom" or name == "XCom":
from airflow.sdk.execution_time import xcom
if name == "BaseXCom":
from airflow.sdk.bases.xcom import BaseXCom

val = getattr(xcom, name)
globals()[name] = BaseXCom
return BaseXCom

globals()[name] = val
return val
if name == "XCom":
from airflow.sdk.execution_time.xcom import XCom

globals()[name] = XCom
return XCom

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from airflow.sdk.definitions.sensors.base import (
from airflow.sdk.bases.sensor import (
BaseSensorOperator as BaseSensorOperator,
PokeReturnValue as PokeReturnValue,
poke_mode_only as poke_mode_only,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from airflow.models.xcom import XComModel
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
from airflow.providers_manager import ProvidersManager
from airflow.sdk.bases.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
Expand All @@ -63,7 +64,6 @@
AssetWatcher,
BaseAsset,
)
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False):
"""Create a nested dict representation of this TaskGroup and its children used to construct the Graph."""
from airflow.sdk.bases.baseoperator import BaseOperator
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator

if isinstance(task := task_item_or_group, AbstractOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import XComModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.execution_time.xcom import BaseXCom, resolve_xcom_backend
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.execution_time.xcom import resolve_xcom_backend
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.sdk.api.datamodels._generated import AssetEventResponse, AssetResponse
from airflow.sdk.bases.notifier import BaseNotifier
from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.sdk.definitions.notifier import BaseNotifier
from airflow.sdk.definitions.param import process_params
from airflow.sdk.execution_time.comms import (
AssetEventsResult,
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import XComModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.execution_time.xcom import BaseXCom, resolve_xcom_backend
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.execution_time.xcom import resolve_xcom_backend
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,7 @@ def test_edge_info_serialization(self):
@pytest.mark.db_test
@pytest.mark.parametrize("mode", ["poke", "reschedule"])
def test_serialize_sensor(self, mode):
from airflow.sdk.definitions.sensors.base import BaseSensorOperator
from airflow.sdk.bases.sensor import BaseSensorOperator

class DummySensor(BaseSensorOperator):
def poke(self, context: Context):
Expand All @@ -2032,7 +2032,7 @@ def poke(self, context: Context):

@pytest.mark.parametrize("mode", ["poke", "reschedule"])
def test_serialize_mapped_sensor_has_reschedule_dep(self, mode):
from airflow.sdk.definitions.sensors.base import BaseSensorOperator
from airflow.sdk.bases.sensor import BaseSensorOperator
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep

class DummySensor(BaseSensorOperator):
Expand Down
2 changes: 1 addition & 1 deletion dev/mypy/plugin/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
OUTPUT_PROPERTIES = {
"airflow.models.baseoperator.BaseOperator.output",
"airflow.models.mappedoperator.MappedOperator.output",
"airflow.sdk.definitions.baseoperator.BaseOperator.output",
"airflow.sdk.bases.baseoperator.BaseOperator.output",
}

TASK_CALL_FUNCTIONS = {
Expand Down
2 changes: 1 addition & 1 deletion devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.api.datamodels._generated import IntermediateTIState, TerminalTIState
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.execution_time.comms import StartupDetails, ToSupervisor
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.timetables.base import DataInterval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.sdk.definitions.notifier import BaseNotifier
from airflow.sdk.bases.notifier import BaseNotifier
elif AIRFLOW_V_3_0_PLUS:
from airflow.sdk.definitions.notifier import BaseNotifier
from airflow.sdk.bases.notifier import BaseNotifier
else:
from airflow.notifications.basenotifier import BaseNotifier

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from airflow.sdk.execution_time.comms import XComResult

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.execution_time.xcom import BaseXCom
from airflow.sdk.bases.xcom import BaseXCom
else:
from airflow.models.xcom import BaseXCom # type: ignore[no-redef]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.common.links.storage import StorageLink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
T = TypeVar("T", bound="DAG | Operator")

if TYPE_CHECKING:
from airflow.sdk.definitions.baseoperator import BaseOperator as SdkBaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator as SdkBaseOperator


log = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def hook_lineage_collector():

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.api.datamodels._generated import BundleInfo, TaskInstance as SDKTaskInstance
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import StartupDetails
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, parse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from common_precommit_utils import AIRFLOW_CORE_SOURCES_PATH, AIRFLOW_TASK_SDK_SOURCES_PATH, console

BASEOPERATOR_PY = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "models" / "baseoperator.py"
SDK_BASEOPERATOR_PY = AIRFLOW_TASK_SDK_SOURCES_PATH / "airflow" / "sdk" / "definitions" / "baseoperator.py"
SDK_BASEOPERATOR_PY = AIRFLOW_TASK_SDK_SOURCES_PATH / "airflow" / "sdk" / "bases" / "baseoperator.py"
SDK_MAPPEDOPERATOR_PY = (
AIRFLOW_TASK_SDK_SOURCES_PATH / "airflow" / "sdk" / "definitions" / "mappedoperator.py"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down
20 changes: 10 additions & 10 deletions task-sdk/src/airflow/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@
__version__ = "1.0.0.alpha1"

if TYPE_CHECKING:
from airflow.sdk.bases.baseoperator import BaseOperator, chain, chain_linear, cross_downstream
from airflow.sdk.bases.notifier import BaseNotifier
from airflow.sdk.bases.operatorlink import BaseOperatorLink
from airflow.sdk.bases.sensor import BaseSensorOperator
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, AssetWatcher
from airflow.sdk.definitions.asset.decorators import asset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.baseoperator import BaseOperator, chain, chain_linear, cross_downstream
from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink
from airflow.sdk.definitions.connection import Connection
from airflow.sdk.definitions.context import Context, get_current_context, get_parsing_context
from airflow.sdk.definitions.dag import DAG, dag
from airflow.sdk.definitions.edges import EdgeModifier, Label
from airflow.sdk.definitions.notifier import BaseNotifier
from airflow.sdk.definitions.param import Param
from airflow.sdk.definitions.sensors.base import BaseSensorOperator
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.sdk.definitions.template import literal
from airflow.sdk.definitions.variable import Variable
Expand All @@ -76,9 +76,9 @@
"AssetAny": ".definitions.asset",
"AssetWatcher": ".definitions.asset",
"BaseNotifier": ".definitions.notifier",
"BaseOperator": ".definitions.baseoperator",
"BaseOperatorLink": ".definitions.baseoperatorlink",
"BaseSensorOperator": ".definitions.sensors.base",
"BaseOperator": ".bases.baseoperator",
"BaseOperatorLink": ".bases.operatorlink",
"BaseSensorOperator": ".bases.sensor",
"Connection": ".definitions.connection",
"Context": ".definitions.context",
"DAG": ".definitions.dag",
Expand All @@ -90,9 +90,9 @@
"Variable": ".definitions.variable",
"XComArg": ".definitions.xcom_arg",
"asset": ".definitions.asset.decorators",
"chain": ".definitions.baseoperator",
"chain_linear": ".definitions.baseoperator",
"cross_downstream": ".definitions.baseoperator",
"chain": ".bases.baseoperator",
"chain_linear": ".bases.baseoperator",
"cross_downstream": ".bases.baseoperator",
"dag": ".definitions.dag",
"get_current_context": ".definitions.context",
"get_parsing_context": ".definitions.context",
Expand Down
16 changes: 16 additions & 0 deletions task-sdk/src/airflow/sdk/bases/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
TaskDeferralTimeout,
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.bases.baseoperator import BaseOperator
from airflow.utils import timezone

if TYPE_CHECKING:
Expand Down
Loading