diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index 9f42441f1c3be..f484a482e74e6 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -442,8 +442,8 @@ labelPRBasedOnFilePath: - airflow-core/docs/howto/usage-cli.rst area:Lineage: - - airflow-core/src/airflow/lineage/**/* - - airflow-core/tests/unit/lineage/**/* + - task-sdk/src/airflow/sdk/lineage.py + - task-sdk/tests/task_sdk/test_lineage.py - airflow-core/docs/administration-and-deployment/lineage.rst area:Logging: diff --git a/airflow-core/src/airflow/lineage/__init__.py b/airflow-core/src/airflow/lineage/__init__.py index 217e5db960782..15afa1a75de2a 100644 --- a/airflow-core/src/airflow/lineage/__init__.py +++ b/airflow-core/src/airflow/lineage/__init__.py @@ -15,3 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + +from airflow.utils.deprecation_tools import add_deprecated_classes + +add_deprecated_classes( + { + __name__: {"hook": "airflow.sdk.lineage"}, + "hook": {"*": "airflow.sdk.lineage"}, + }, + package=__name__, +) diff --git a/airflow-core/src/airflow/plugins_manager.py b/airflow-core/src/airflow/plugins_manager.py index af05f1aacd20f..f26878fa4eeea 100644 --- a/airflow-core/src/airflow/plugins_manager.py +++ b/airflow-core/src/airflow/plugins_manager.py @@ -38,7 +38,6 @@ from airflow.configuration import conf if TYPE_CHECKING: - from airflow.lineage.hook import HookLineageReader from airflow.listeners.listener import ListenerManager from airflow.partition_mapper.base import PartitionMapper from airflow.task.priority_strategy import PriorityWeightStrategy @@ -283,17 +282,6 @@ def get_partition_mapper_plugins() -> dict[str, type[PartitionMapper]]: } -@cache -def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: - """Collect and get hook lineage reader classes registered by plugins.""" - log.debug("Initialize hook lineage readers plugins") - result: list[type[HookLineageReader]] = [] - - for plugin in _get_plugins()[0]: - result.extend(plugin.hook_lineage_readers) - return result - - @cache def integrate_macros_plugins() -> None: """Integrates macro plugins.""" diff --git a/airflow-core/tests/unit/lineage/__init__.py b/airflow-core/tests/unit/lineage/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow-core/tests/unit/lineage/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index 982dc4e37b168..7e70fe6162ad5 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -163,7 +163,6 @@ def _find_all_integration_folders() -> list[str]: "airflow-core/tests/unit/decorators", "airflow-core/tests/unit/hooks", "airflow-core/tests/unit/io", - "airflow-core/tests/unit/lineage", "airflow-core/tests/unit/listeners", "airflow-core/tests/unit/logging", "airflow-core/tests/unit/macros", diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index 258d3345706cc..cdb5ffe183a00 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -1980,7 +1980,7 @@ def _mock_plugins(request: pytest.FixtureRequest): @pytest.fixture def hook_lineage_collector(): - from airflow.lineage.hook import HookLineageCollector + from airflow.providers.common.compat.sdk import HookLineageCollector hlc = HookLineageCollector() with mock.patch( diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py b/devel-common/src/tests_common/test_utils/mock_plugins.py index dfc654d15ee94..d4bc391418f55 100644 --- a/devel-common/src/tests_common/test_utils/mock_plugins.py +++ b/devel-common/src/tests_common/test_utils/mock_plugins.py @@ -85,6 +85,7 @@ def mock_plugin_manager(plugins=None, **kwargs): if AIRFLOW_V_3_2_PLUS: # Always start the block with an non-initialized plugins, so ensure_plugins_loaded runs. from airflow import plugins_manager + from airflow.sdk import plugins_manager as sdk_plugins_manager plugins_manager._get_plugins.cache_clear() plugins_manager._get_ui_plugins.cache_clear() @@ -92,10 +93,12 @@ def mock_plugin_manager(plugins=None, **kwargs): plugins_manager.get_fastapi_plugins.cache_clear() plugins_manager._get_extra_operators_links_plugins.cache_clear() plugins_manager.get_timetables_plugins.cache_clear() - plugins_manager.get_hook_lineage_readers_plugins.cache_clear() plugins_manager.integrate_macros_plugins.cache_clear() plugins_manager.get_priority_weight_strategy_plugins.cache_clear() + sdk_plugins_manager.integrate_macros_plugins.cache_clear() + sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear() + if plugins is not None or "import_errors" in kwargs: exit_stack.enter_context( mock.patch( @@ -106,6 +109,15 @@ def mock_plugin_manager(plugins=None, **kwargs): ), ) ) + exit_stack.enter_context( + mock.patch( + "airflow.sdk.plugins_manager._get_plugins", + return_value=( + plugins or [], + kwargs.get("import_errors", {}), + ), + ) + ) elif kwargs: raise NotImplementedError( "mock_plugin_manager does not support patching other attributes in Airflow 3.2+" diff --git a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py index 2eb07f446f34c..37b352b2cc59c 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py +++ b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from typing import Any - from airflow.lineage.hook import LineageContext + from airflow.sdk.lineage import LineageContext def _lacks_asset_methods(collector): @@ -62,7 +62,7 @@ def _add_extra_polyfill(collector): import attr - from airflow.lineage.hook import HookLineage as _BaseHookLineage + from airflow.providers.common.compat.sdk import HookLineage as _BaseHookLineage # Add `extra` to HookLineage returned by `collected_assets` property @attr.define @@ -229,7 +229,7 @@ def get_hook_lineage_collector(): Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - apply single layer. Airflow 3.2+: Collector has asset-based methods and `add_extra` support - no action required. """ - from airflow.lineage.hook import get_hook_lineage_collector as get_global_collector + from airflow.providers.common.compat.sdk import get_hook_lineage_collector as get_global_collector global_collector = get_global_collector() diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py b/providers/common/compat/src/airflow/providers/common/compat/sdk.py index 398b1219e5104..9cbb979236273 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py +++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py @@ -95,6 +95,13 @@ TaskDeferred as TaskDeferred, XComNotFound as XComNotFound, ) + from airflow.sdk.lineage import ( + HookLineage as HookLineage, + HookLineageCollector as HookLineageCollector, + HookLineageReader as HookLineageReader, + NoOpCollector as NoOpCollector, + get_hook_lineage_collector as get_hook_lineage_collector, + ) from airflow.sdk.listener import get_listener_manager as get_listener_manager from airflow.sdk.log import redact as redact from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin @@ -125,6 +132,10 @@ "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"), } +# Airflow 3-only renames (not available in Airflow 2) +_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {} + + # Import map for classes/functions/constants # Format: class_name -> module_path(s) # - str: single module path (no fallback) @@ -233,6 +244,15 @@ # ============================================================================ "XCOM_RETURN_KEY": "airflow.models.xcom", # ============================================================================ + # Lineage + # ============================================================================ + "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"), + "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"), + "get_hook_lineage_collector": ("airflow.sdk.lineage", "airflow.lineage.hook"), + "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"), + # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo -> AssetLineageInfo) + "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"), + # ============================================================================ # Exceptions (deprecated in airflow.exceptions, prefer SDK) # ============================================================================ # Note: AirflowException and AirflowNotFoundException are not deprecated, but exposing them @@ -277,9 +297,14 @@ "DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"), } -# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+ +# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow 3+ if AIRFLOW_V_3_0_PLUS: _IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS) + _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES) + # AssetLineageInfo exists in 3.0+ but location changed in 3.2 + # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo + # 3.2+: airflow.sdk.lineage.AssetLineageInfo + _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage", "airflow.lineage.hook") # Module map: module_name -> module_path(s) # For entire modules that have been moved (e.g., timezone) diff --git a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py index 9045512531e54..c4d948e569b99 100644 --- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py +++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py @@ -27,7 +27,7 @@ @pytest.fixture def collector(): - from airflow.lineage.hook import HookLineageCollector + from airflow.providers.common.compat.sdk import HookLineageCollector # Patch the "inner" function that the compat version will call with mock.patch( @@ -41,7 +41,7 @@ def collector(): @pytest.fixture def noop_collector(): - from airflow.lineage.hook import NoOpCollector + from airflow.providers.common.compat.sdk import NoOpCollector # Patch the "inner" function that the compat version will call with mock.patch( diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py index 374d8b2f06b3d..375b0bbf2f1b2 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py @@ -21,7 +21,7 @@ # Conditional imports - only load expensive dependencies when plugin is enabled if not conf.is_disabled(): - from airflow.lineage.hook import HookLineageReader + from airflow.providers.common.compat.sdk import HookLineageReader from airflow.providers.openlineage.plugins.listener import get_openlineage_listener from airflow.providers.openlineage.plugins.macros import ( lineage_job_name, diff --git a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py index 3989144009e8c..9e2b1782b81e0 100644 --- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py +++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py @@ -50,19 +50,25 @@ @pytest.fixture def hook_lineage_collector(): - from airflow.lineage import hook - from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector + from airflow.providers.common.compat.sdk import HookLineageCollector - hlc = hook.HookLineageCollector() + from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS + + hlc = HookLineageCollector() + patch_target = "airflow.lineage.hook.get_hook_lineage_collector" + if AIRFLOW_V_3_2_PLUS: + patch_target = "airflow.sdk.lineage.get_hook_lineage_collector" if AIRFLOW_V_3_0_PLUS: from unittest import mock - with mock.patch( - "airflow.lineage.hook.get_hook_lineage_collector", - return_value=hlc, - ): + with mock.patch(patch_target, return_value=hlc): + from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector + yield get_hook_lineage_collector() else: + from airflow.lineage import hook + from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector + hook._hook_lineage_collector = hlc yield get_hook_lineage_collector() diff --git a/task-sdk/src/airflow/sdk/io/path.py b/task-sdk/src/airflow/sdk/io/path.py index 611dfe2c8df1b..3f87e6a1d9545 100644 --- a/task-sdk/src/airflow/sdk/io/path.py +++ b/task-sdk/src/airflow/sdk/io/path.py @@ -44,7 +44,7 @@ def __init__(self, path: ObjectStoragePath, obj): self._obj = obj def __getattr__(self, name): - from airflow.lineage.hook import get_hook_lineage_collector + from airflow.sdk.lineage import get_hook_lineage_collector if not callable(attr := getattr(self._obj, name)): return attr @@ -312,7 +312,7 @@ def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs) kwargs: Additional keyword arguments to be passed to the underlying implementation. """ - from airflow.lineage.hook import get_hook_lineage_collector + from airflow.sdk.lineage import get_hook_lineage_collector if isinstance(dst, str): dst = ObjectStoragePath(dst) @@ -380,7 +380,7 @@ def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs) kwargs: Additional keyword arguments to be passed to the underlying implementation. """ - from airflow.lineage.hook import get_hook_lineage_collector + from airflow.sdk.lineage import get_hook_lineage_collector if isinstance(path, str): path = ObjectStoragePath(path) diff --git a/airflow-core/src/airflow/lineage/hook.py b/task-sdk/src/airflow/sdk/lineage.py similarity index 97% rename from airflow-core/src/airflow/lineage/hook.py rename to task-sdk/src/airflow/sdk/lineage.py index 7c22a36700645..acdf284a59684 100644 --- a/airflow-core/src/airflow/lineage/hook.py +++ b/task-sdk/src/airflow/sdk/lineage.py @@ -24,10 +24,11 @@ from typing import TYPE_CHECKING, Any, TypeAlias import attr +import structlog -from airflow.providers_manager import ProvidersManager +from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin from airflow.sdk.definitions.asset import Asset -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime if TYPE_CHECKING: from pydantic.types import JsonValue @@ -38,6 +39,9 @@ LineageContext: TypeAlias = BaseHook | ObjectStoragePath +log = structlog.getLogger(__name__) + + # Maximum number of assets input or output that can be collected in a single hook execution. # Input assets and output assets are collected separately. MAX_COLLECTED_ASSETS = 100 @@ -106,7 +110,7 @@ def __init__(self, **kwargs): self._outputs: dict[str, tuple[Asset, LineageContext]] = {} self._input_counts: dict[str, int] = defaultdict(int) self._output_counts: dict[str, int] = defaultdict(int) - self._asset_factories = ProvidersManager().asset_factories + self._asset_factories = ProvidersManagerTaskRuntime().asset_factories self._extra_counts: dict[str, int] = defaultdict(int) self._extra: dict[str, tuple[str, Any, LineageContext]] = {} @@ -335,7 +339,7 @@ def retrieve_hook_lineage(self) -> HookLineage: @cache def get_hook_lineage_collector() -> HookLineageCollector: """Get singleton lineage collector.""" - from airflow import plugins_manager + from airflow.sdk import plugins_manager if plugins_manager.get_hook_lineage_readers_plugins(): return HookLineageCollector() diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py b/task-sdk/src/airflow/sdk/plugins_manager.py index 2cbdf1a44a492..4f4abbec68bb2 100644 --- a/task-sdk/src/airflow/sdk/plugins_manager.py +++ b/task-sdk/src/airflow/sdk/plugins_manager.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from airflow.sdk._shared.listeners.listener import ListenerManager + from airflow.sdk.lineage import HookLineageReader log = logging.getLogger(__name__) @@ -131,3 +132,14 @@ def integrate_listener_plugins(listener_manager: ListenerManager) -> None: """Add listeners from plugins.""" plugins, _ = _get_plugins() _integrate_listener_plugins(listener_manager, plugins=plugins) + + +@cache +def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]: + """Collect and get hook lineage reader classes registered by plugins.""" + log.debug("Initialize hook lineage readers plugins") + result: list[type[HookLineageReader]] = [] + + for plugin in _get_plugins()[0]: + result.extend(plugin.hook_lineage_readers) + return result diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py b/task-sdk/tests/task_sdk/docs/test_public_api.py index f53887ec5c982..e8d7dd68e9e9c 100644 --- a/task-sdk/tests/task_sdk/docs/test_public_api.py +++ b/task-sdk/tests/task_sdk/docs/test_public_api.py @@ -63,6 +63,7 @@ def test_airflow_sdk_no_unexpected_exports(): "listener", "crypto", "providers_manager_runtime", + "lineage", } unexpected = actual - public - ignore assert not unexpected, f"Unexpected exports in airflow.sdk: {sorted(unexpected)}" diff --git a/airflow-core/tests/unit/lineage/test_hook.py b/task-sdk/tests/task_sdk/test_lineage.py similarity index 98% rename from airflow-core/tests/unit/lineage/test_hook.py rename to task-sdk/tests/task_sdk/test_lineage.py index f540315924724..3bf706cabb479 100644 --- a/airflow-core/tests/unit/lineage/test_hook.py +++ b/task-sdk/tests/task_sdk/test_lineage.py @@ -21,9 +21,8 @@ import pytest -from airflow import plugins_manager -from airflow.lineage import hook -from airflow.lineage.hook import ( +from airflow.sdk import Asset, BaseHook, plugins_manager +from airflow.sdk.lineage import ( AssetLineageInfo, HookLineage, HookLineageCollector, @@ -31,8 +30,6 @@ NoOpCollector, get_hook_lineage_collector, ) -from airflow.sdk import BaseHook -from airflow.sdk.definitions.asset import Asset from tests_common.test_utils.mock_plugins import mock_plugin_manager @@ -137,7 +134,7 @@ def test_are_assets_collected(self, collector): ], ) - @patch("airflow.lineage.hook.Asset") + @patch("airflow.sdk.lineage.Asset") def test_add_input_asset(self, mock_asset, collector): asset = MagicMock(spec=Asset, extra={}) mock_asset.return_value = asset @@ -196,7 +193,7 @@ def create_asset(arg1, arg2="default", extra=None): uri="myscheme://value_1/value_2", name="asset-value_1", group="test", extra={"key": "value"} ) - @patch("airflow.lineage.hook.ProvidersManager") + @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime") def test_create_asset_no_factory(self, mock_providers_manager, collector): test_scheme = "myscheme" mock_providers_manager.return_value.asset_factories = {} @@ -215,7 +212,7 @@ def test_create_asset_no_factory(self, mock_providers_manager, collector): is None ) - @patch("airflow.lineage.hook.ProvidersManager") + @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime") def test_create_asset_factory_exception(self, mock_providers_manager, collector): def create_asset(extra=None, **kwargs): raise RuntimeError("Factory error") @@ -873,7 +870,7 @@ class FakePlugin(plugins_manager.AirflowPlugin): ) def test_get_hook_lineage_collector(has_readers, expected_class): # reset cached instance - hook.get_hook_lineage_collector.cache_clear() + get_hook_lineage_collector.cache_clear() plugins = [FakePlugin()] if has_readers else [] with mock_plugin_manager(plugins=plugins): assert isinstance(get_hook_lineage_collector(), expected_class)