Skip to content

Commit

Permalink
Remove default create_dataset method.
Browse files Browse the repository at this point in the history
Add section in experimental lineage docs.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed Jul 10, 2024
1 parent 28d697d commit 6c9e124
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 29 deletions.
5 changes: 0 additions & 5 deletions airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ def normalize_noop(parts: SplitResult) -> SplitResult:
return parts


def create_dataset(uri: str) -> Dataset:
"""Create a dataset object from a dataset URI."""
return Dataset(uri=uri)


def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None:
if scheme == "file":
return normalize_noop
Expand Down
25 changes: 17 additions & 8 deletions airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import attr

from airflow.datasets import Dataset, create_dataset
from airflow.datasets import Dataset
from airflow.hooks.base import BaseHook
from airflow.io.store import ObjectStore
from airflow.providers_manager import ProvidersManager
Expand Down Expand Up @@ -58,7 +58,7 @@ def create_dataset(dataset_kwargs: dict) -> Dataset:
"""Create a Dataset instance from the given dataset kwargs."""
if "uri" in dataset_kwargs:
# Fallback to default factory using the provided URI
return create_dataset(dataset_kwargs["uri"])
return Dataset(uri=dataset_kwargs["uri"])

scheme: str = dataset_kwargs.pop("scheme", None)
if not scheme:
Expand All @@ -74,14 +74,22 @@ def create_dataset(dataset_kwargs: dict) -> Dataset:

return dataset_factory(**dataset_kwargs)

def add_input_dataset(self, dataset_kwargs: dict, hook: LineageContext):
def add_input_dataset(self, dataset_kwargs: dict, hook: LineageContext) -> None:
"""Add the input dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(dataset_kwargs)
try:
dataset = self.create_dataset(dataset_kwargs)
except Exception as e:
self.log.debug("Failed to create AIP-60 compliant Dataset. %s", e)
return
self.inputs.append((dataset, hook))

def add_output_dataset(self, dataset_kwargs: dict, hook: LineageContext):
def add_output_dataset(self, dataset_kwargs: dict, hook: LineageContext) -> None:
"""Add the output dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(dataset_kwargs)
try:
dataset = self.create_dataset(dataset_kwargs)
except Exception as e:
self.log.debug("Failed to create AIP-60 compliant Dataset. %s", e)
return
self.outputs.append((dataset, hook))

@property
Expand Down Expand Up @@ -112,7 +120,9 @@ def add_output_dataset(self, *_):
def collected_datasets(
self,
) -> HookLineage:
self.log.warning("You should not call this as there's no reader.")
self.log.warning(
"Data lineage tracking might be incomplete. Consider registering a hook lineage reader for more detailed information."
)
return HookLineage([], [])


Expand All @@ -132,7 +142,6 @@ def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
global _hook_lineage_collector
if not _hook_lineage_collector:
# is there a better why how to use noop?
if ProvidersManager().hook_lineage_readers:
_hook_lineage_collector = HookLineageCollector()
else:
Expand Down
21 changes: 11 additions & 10 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def initialize_providers_filesystems(self):
self._discover_filesystems()

@provider_info_cache("dataset_uris")
def initialize_providers_dataset_uri_handlers(self):
def initialize_providers_dataset_uri_handlers_and_factories(self):
"""Lazy initialization of provider dataset URI handlers."""
self.initialize_providers_list()
self._discover_dataset_uri_handlers_and_factories()
Expand Down Expand Up @@ -889,25 +889,26 @@ def _discover_filesystems(self) -> None:
self._fs_set = set(sorted(self._fs_set))

def _discover_dataset_uri_handlers_and_factories(self) -> None:
from airflow.datasets import create_dataset, normalize_noop
from airflow.datasets import normalize_noop

for provider_package, provider in self._provider_dict.items():
for handler_info in provider.data.get("dataset-uris", []):
try:
schemes = handler_info["schemes"]
handler_path = handler_info["handler"]
factory_path = handler_info["factory"]
except KeyError:
continue
if handler_path is None:
handler = normalize_noop
if factory_path is None:
factory = create_dataset
elif not (handler := _correctness_check(provider_package, handler_path, provider)) or not (
factory := _correctness_check(provider_package, factory_path, provider)
):
elif not (handler := _correctness_check(provider_package, handler_path, provider)):
continue
self._dataset_uri_handlers.update((scheme, handler) for scheme in schemes)
factory_path = handler_info.get("factory")
if not (
factory_path is not None
and (factory := _correctness_check(provider_package, factory_path, provider))
):
continue
self._dataset_factories.update((scheme, factory) for scheme in schemes)

def _discover_hook_lineage_readers(self) -> None:
Expand Down Expand Up @@ -1314,12 +1315,12 @@ def filesystem_module_names(self) -> list[str]:

@property
def dataset_factories(self) -> dict[str, Callable[..., Dataset]]:
self.initialize_providers_dataset_uri_handlers()
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_factories

@property
def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
self.initialize_providers_dataset_uri_handlers()
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_uri_handlers

@property
Expand Down
36 changes: 36 additions & 0 deletions docs/apache-airflow/administration-and-deployment/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,42 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
.. _precedence: https://docs.python.org/3/reference/expressions.html

Hook Lineage
------------

Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks.
This functionality helps you understand how data flows throughout your Airflow pipelines.

A global instance of ``HookLineageCollector`` serves as the central hub for collecting lineage information.
Hooks can send details about datasets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Datasets, a standard format for describing datasets.

.. code-block:: python
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_inlet(dataset_kwargs={"scheme": "file", "path": "/tmp/in"}, self)
collector.add_outlet(dataset_kwargs={"scheme": "file", "path": "/tmp/out"}, self)
Lineage data collected by the ``HookLineageCollector`` can be accessed using an instance of ``HookLineageReader``.

.. code-block:: python
from airflow.lineage.hook_lineage import HookLineageReader
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead.
This collector does not create AIP-60 compliant datasets or collect lineage information.


Lineage Backend
---------------
Expand Down
12 changes: 6 additions & 6 deletions tests/lineage/test_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ def test_are_datasets_collected(self):
[(Dataset("postgres://example.com:5432/database/default/table"), output_hook)],
)

@patch("airflow.lineage.hook.create_dataset")
def test_add_input_dataset(self, mock_create_dataset):
@patch("airflow.lineage.hook.Dataset")
def test_add_input_dataset(self, mock_dataset):
collector = HookLineageCollector()
mock_dataset = MagicMock(spec=Dataset)
mock_create_dataset.return_value = mock_dataset
dataset = MagicMock(spec=Dataset)
mock_dataset.return_value = dataset

dataset_kwargs = {"uri": "test_uri"}
hook = MagicMock()
collector.add_input_dataset(dataset_kwargs, hook)

assert collector.inputs == [(mock_dataset, hook)]
mock_create_dataset.assert_called_once_with("test_uri")
assert collector.inputs == [(dataset, hook)]
mock_dataset.assert_called_once_with(uri="test_uri")

@patch("airflow.lineage.hook.ProvidersManager")
def test_create_dataset(self, mock_providers_manager):
Expand Down

0 comments on commit 6c9e124

Please sign in to comment.