Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ labelPRBasedOnFilePath:
- airflow-core/docs/howto/usage-cli.rst

area:Lineage:
- airflow-core/src/airflow/lineage/**/*
- airflow-core/tests/unit/lineage/**/*
- task-sdk/src/airflow/sdk/lineage.py
- task-sdk/tests/task_sdk/test_lineage.py
- airflow-core/docs/administration-and-deployment/lineage.rst

area:Logging:
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.utils.deprecation_tools import add_deprecated_classes

add_deprecated_classes(
{
__name__: {"hook": "airflow.sdk.lineage"},
"hook": {"*": "airflow.sdk.lineage"},
},
package=__name__,
)
12 changes: 0 additions & 12 deletions airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from airflow.configuration import conf

if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader
from airflow.listeners.listener import ListenerManager
from airflow.partition_mapper.base import PartitionMapper
from airflow.task.priority_strategy import PriorityWeightStrategy
Expand Down Expand Up @@ -283,17 +282,6 @@ def get_partition_mapper_plugins() -> dict[str, type[PartitionMapper]]:
}


@cache
def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
"""Collect and get hook lineage reader classes registered by plugins."""
log.debug("Initialize hook lineage readers plugins")
result: list[type[HookLineageReader]] = []

for plugin in _get_plugins()[0]:
result.extend(plugin.hook_lineage_readers)
return result


@cache
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
Expand Down
17 changes: 0 additions & 17 deletions airflow-core/tests/unit/lineage/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion dev/breeze/tests/test_pytest_args_for_test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def _find_all_integration_folders() -> list[str]:
"airflow-core/tests/unit/decorators",
"airflow-core/tests/unit/hooks",
"airflow-core/tests/unit/io",
"airflow-core/tests/unit/lineage",
"airflow-core/tests/unit/listeners",
"airflow-core/tests/unit/logging",
"airflow-core/tests/unit/macros",
Expand Down
25 changes: 19 additions & 6 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,17 +1980,30 @@ def _mock_plugins(request: pytest.FixtureRequest):

@pytest.fixture
def hook_lineage_collector():
from airflow.lineage.hook import HookLineageCollector
from airflow.providers.common.compat.sdk import HookLineageCollector

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS

hlc = HookLineageCollector()
with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=hlc,
):
# Redirect calls to compat provider to support back-compat tests of 2.x as well

if AIRFLOW_V_3_0_PLUS:
from unittest import mock

patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
if AIRFLOW_V_3_2_PLUS:
patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"

with mock.patch(patch_target, return_value=hlc):
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

yield get_hook_lineage_collector()
else:
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
hook._hook_lineage_collector = None


@pytest.fixture
Expand Down
14 changes: 13 additions & 1 deletion devel-common/src/tests_common/test_utils/mock_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,20 @@ def mock_plugin_manager(plugins=None, **kwargs):
if AIRFLOW_V_3_2_PLUS:
# Always start the block with an non-initialized plugins, so ensure_plugins_loaded runs.
from airflow import plugins_manager
from airflow.sdk import plugins_manager as sdk_plugins_manager

plugins_manager._get_plugins.cache_clear()
plugins_manager._get_ui_plugins.cache_clear()
plugins_manager.get_flask_plugins.cache_clear()
plugins_manager.get_fastapi_plugins.cache_clear()
plugins_manager._get_extra_operators_links_plugins.cache_clear()
plugins_manager.get_timetables_plugins.cache_clear()
plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
plugins_manager.integrate_macros_plugins.cache_clear()
plugins_manager.get_priority_weight_strategy_plugins.cache_clear()

sdk_plugins_manager.integrate_macros_plugins.cache_clear()
sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear()

if plugins is not None or "import_errors" in kwargs:
exit_stack.enter_context(
mock.patch(
Expand All @@ -106,6 +109,15 @@ def mock_plugin_manager(plugins=None, **kwargs):
),
)
)
exit_stack.enter_context(
mock.patch(
"airflow.sdk.plugins_manager._get_plugins",
return_value=(
plugins or [],
kwargs.get("import_errors", {}),
),
)
)
elif kwargs:
raise NotImplementedError(
"mock_plugin_manager does not support patching other attributes in Airflow 3.2+"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from typing import Any

from airflow.lineage.hook import LineageContext
from airflow.sdk.lineage import LineageContext


def _lacks_asset_methods(collector):
Expand Down Expand Up @@ -62,7 +62,7 @@ def _add_extra_polyfill(collector):

import attr

from airflow.lineage.hook import HookLineage as _BaseHookLineage
from airflow.providers.common.compat.sdk import HookLineage as _BaseHookLineage

# Add `extra` to HookLineage returned by `collected_assets` property
@attr.define
Expand Down Expand Up @@ -229,7 +229,7 @@ def get_hook_lineage_collector():
Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` - apply single layer.
Airflow 3.2+: Collector has asset-based methods and `add_extra` support - no action required.
"""
from airflow.lineage.hook import get_hook_lineage_collector as get_global_collector
from airflow.providers.common.compat.sdk import get_hook_lineage_collector as get_global_collector

global_collector = get_global_collector()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@
TaskDeferred as TaskDeferred,
XComNotFound as XComNotFound,
)
from airflow.sdk.lineage import (
HookLineage as HookLineage,
HookLineageCollector as HookLineageCollector,
HookLineageReader as HookLineageReader,
NoOpCollector as NoOpCollector,
get_hook_lineage_collector as get_hook_lineage_collector,
)
from airflow.sdk.listener import get_listener_manager as get_listener_manager
from airflow.sdk.log import redact as redact
from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin
Expand Down Expand Up @@ -126,6 +133,10 @@
"AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
}

# Airflow 3-only renames (not available in Airflow 2)
_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {}


# Import map for classes/functions/constants
# Format: class_name -> module_path(s)
# - str: single module path (no fallback)
Expand Down Expand Up @@ -235,6 +246,15 @@
# ============================================================================
"XCOM_RETURN_KEY": "airflow.models.xcom",
# ============================================================================
# Lineage
# ============================================================================
"HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
"HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"),
"get_hook_lineage_collector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
"HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"),
# Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo -> AssetLineageInfo)
"NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
# ============================================================================
# Exceptions (deprecated in airflow.exceptions, prefer SDK)
# ============================================================================
# Note: AirflowException and AirflowNotFoundException are not deprecated, but exposing them
Expand Down Expand Up @@ -279,9 +299,14 @@
"DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"),
}

# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+
# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow 3+
if AIRFLOW_V_3_0_PLUS:
_IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS)
_RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES)
# AssetLineageInfo exists in 3.0+ but location changed in 3.2
# 3.0-3.1: airflow.lineage.hook.AssetLineageInfo
# 3.2+: airflow.sdk.lineage.AssetLineageInfo
_IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage", "airflow.lineage.hook")

# Module map: module_name -> module_path(s)
# For entire modules that have been moved (e.g., timezone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,58 @@

@pytest.fixture
def collector():
from airflow.lineage.hook import HookLineageCollector
from airflow.providers.common.compat.sdk import HookLineageCollector

# Patch the "inner" function that the compat version will call
with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=HookLineageCollector(),
):
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS

hlc = HookLineageCollector()

if AIRFLOW_V_3_0_PLUS:
from unittest import mock

patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
if AIRFLOW_V_3_2_PLUS:
patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"

with mock.patch(patch_target, return_value=hlc):
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

yield get_hook_lineage_collector()
else:
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
hook._hook_lineage_collector = None


@pytest.fixture
def noop_collector():
from airflow.lineage.hook import NoOpCollector
from airflow.providers.common.compat.sdk import NoOpCollector

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS

noop = NoOpCollector()

if AIRFLOW_V_3_0_PLUS:
from unittest import mock

patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
if AIRFLOW_V_3_2_PLUS:
patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"

with mock.patch(patch_target, return_value=noop):
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

# Patch the "inner" function that the compat version will call
with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=NoOpCollector(),
):
yield get_hook_lineage_collector()
else:
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hook._hook_lineage_collector = noop
yield get_hook_lineage_collector()
hook._hook_lineage_collector = None


@pytest.fixture(params=["collector", "noop_collector"])
Expand Down
2 changes: 1 addition & 1 deletion providers/openlineage/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow-providers-common-compat>=1.13.0",
"apache-airflow-providers-common-compat>=1.13.0", # use next version
"attrs>=22.2",
"openlineage-integration-common>=1.41.0",
"openlineage-python>=1.41.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# Conditional imports - only load expensive dependencies when plugin is enabled
if not conf.is_disabled():
from airflow.lineage.hook import HookLineageReader
from airflow.providers.common.compat.sdk import HookLineageReader
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,25 @@

@pytest.fixture
def hook_lineage_collector():
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector
from airflow.providers.common.compat.sdk import HookLineageCollector

hlc = hook.HookLineageCollector()
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS

hlc = HookLineageCollector()
patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
if AIRFLOW_V_3_2_PLUS:
patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
if AIRFLOW_V_3_0_PLUS:
from unittest import mock

with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=hlc,
):
with mock.patch(patch_target, return_value=hlc):
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

yield get_hook_lineage_collector()
else:
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hook._hook_lineage_collector = hlc

yield get_hook_lineage_collector()
Expand Down
6 changes: 3 additions & 3 deletions task-sdk/src/airflow/sdk/io/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, path: ObjectStoragePath, obj):
self._obj = obj

def __getattr__(self, name):
from airflow.lineage.hook import get_hook_lineage_collector
from airflow.sdk.lineage import get_hook_lineage_collector

if not callable(attr := getattr(self._obj, name)):
return attr
Expand Down Expand Up @@ -312,7 +312,7 @@ def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs)

kwargs: Additional keyword arguments to be passed to the underlying implementation.
"""
from airflow.lineage.hook import get_hook_lineage_collector
from airflow.sdk.lineage import get_hook_lineage_collector

if isinstance(dst, str):
dst = ObjectStoragePath(dst)
Expand Down Expand Up @@ -380,7 +380,7 @@ def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs)

kwargs: Additional keyword arguments to be passed to the underlying implementation.
"""
from airflow.lineage.hook import get_hook_lineage_collector
from airflow.sdk.lineage import get_hook_lineage_collector

if isinstance(path, str):
path = ObjectStoragePath(path)
Expand Down
Loading
Loading