Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions airflow-core/src/airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import hashlib
import json
from collections import defaultdict
from functools import cache
from typing import TYPE_CHECKING, Any, TypeAlias

import attr
Expand All @@ -36,8 +37,6 @@
# Store context what sent lineage.
LineageContext: TypeAlias = BaseHook | ObjectStoragePath

_hook_lineage_collector: HookLineageCollector | None = None


# Maximum number of assets input or output that can be collected in a single hook execution.
# Input assets and output assets are collected separately.
Expand Down Expand Up @@ -333,15 +332,12 @@ def retrieve_hook_lineage(self) -> HookLineage:
return hook_lineage


@cache
def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
global _hook_lineage_collector
if not _hook_lineage_collector:
from airflow import plugins_manager

plugins_manager.initialize_hook_lineage_readers_plugins()
if plugins_manager.hook_lineage_reader_classes:
_hook_lineage_collector = HookLineageCollector()
else:
_hook_lineage_collector = NoOpCollector()
return _hook_lineage_collector
from airflow import plugins_manager

plugins_manager.initialize_hook_lineage_readers_plugins()
if plugins_manager.hook_lineage_reader_classes:
return HookLineageCollector()
return NoOpCollector()
4 changes: 2 additions & 2 deletions airflow-core/tests/unit/lineage/test_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,8 @@ class FakePlugin(plugins_manager.AirflowPlugin):
],
)
def test_get_hook_lineage_collector(has_readers, expected_class):
# reset global variable
hook._hook_lineage_collector = None
# reset cached instance
hook.get_hook_lineage_collector.cache_clear()
plugins = [FakePlugin()] if has_readers else []
with mock_plugin_manager(plugins=plugins):
assert isinstance(get_hook_lineage_collector(), expected_class)
Expand Down
15 changes: 10 additions & 5 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1944,12 +1944,17 @@ def _mock_plugins(request: pytest.FixtureRequest):

@pytest.fixture
def hook_lineage_collector():
from airflow.lineage import hook
from airflow.lineage.hook import HookLineageCollector

hook._hook_lineage_collector = None
hook._hook_lineage_collector = hook.HookLineageCollector()
yield hook.get_hook_lineage_collector()
hook._hook_lineage_collector = None
hlc = HookLineageCollector()
with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=hlc,
):
# Redirect calls to compat provider to support back-compat tests of 2.x as well
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

yield get_hook_lineage_collector()


@pytest.fixture
Expand Down
13 changes: 0 additions & 13 deletions providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,6 @@ def s3_bucket(mocked_s3_res):
return bucket


@pytest.fixture
def hook_lineage_collector():
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hook._hook_lineage_collector = None
hook._hook_lineage_collector = hook.HookLineageCollector()

yield get_hook_lineage_collector()

hook._hook_lineage_collector = None


class TestAwsS3Hook:
@mock_aws
def test_get_conn(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,23 @@
@pytest.fixture
def hook_lineage_collector():
from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import (
get_hook_lineage_collector,
)
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector

hlc = hook.HookLineageCollector()
if AIRFLOW_V_3_0_PLUS:
from unittest import mock

hook._hook_lineage_collector = None
hook._hook_lineage_collector = hook.HookLineageCollector()
with mock.patch(
"airflow.lineage.hook.get_hook_lineage_collector",
return_value=hlc,
):
yield get_hook_lineage_collector()
else:
hook._hook_lineage_collector = hlc

yield get_hook_lineage_collector()
yield get_hook_lineage_collector()

hook._hook_lineage_collector = None
hook._hook_lineage_collector = None


@pytest.mark.parametrize(
Expand Down
Loading