Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 1 addition & 94 deletions airflow-core/docs/administration-and-deployment/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,76 +22,6 @@ Lineage

.. note:: Lineage support is very experimental and subject to change.

Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having
audit trails and data governance, but also debugging of data flows.

Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it
works.

.. code-block:: python

import datetime
import pendulum

from airflow.lineage import AUTO
from airflow.models import DAG
from airflow.providers.common.compat.lineage.entities import File
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)

f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)

f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)

run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)

Inlets can be a (list of) upstream task ids or statically defined as an attr annotated object
as is, for example, the ``File`` object. Outlets can only be attr annotated object. Both are rendered
at run time. However, the outlets of a task in case they are inlets to another task will not be re-rendered
for the downstream task.

.. note:: Operators can add inlets and outlets automatically if the operator supports it.

In the example DAG task ``run_this`` (``task_id=run_me_first``) is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are
generated from a list. Note that ``data_interval_start`` is a templated field and will be rendered when the task is running.

.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the ``pre_execute`` method of a task. When the task
has finished execution ``post_execute`` is called and lineage metadata is pushed into XCOM. Thus if you are creating
your own operators that override this method make sure to decorate your method with ``prepare_lineage`` and ``apply_lineage``
respectively.

Shorthand notation
------------------

Shorthand notation is available as well, this works almost equal to unix command line pipes, inputs and outputs.
Note that operator precedence_ still applies. Also the ``|`` operator will only work when the left hand side either
has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box support of lineage ``operator.supports_lineage == True``.

.. code-block:: python

f_in > run_this | (run_this_last > outlets)

.. _precedence: https://docs.python.org/3/reference/expressions.html

Hook Lineage
------------

Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks.
This functionality helps you understand how data flows throughout your Airflow pipelines.

Expand All @@ -101,7 +31,7 @@ The collector then uses this data to construct AIP-60 compliant Assets, a standa

.. code-block:: python

from airflow.lineage.hook.lineage import get_hook_lineage_collector
from airflow.lineage.hook import get_hook_lineage_collector


class CustomHook(BaseHook):
Expand Down Expand Up @@ -131,26 +61,3 @@ which is registered in an Airflow plugin.

If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead.
This collector does not create AIP-60 compliant assets or collect lineage information.


Lineage Backend
---------------

It's possible to push the lineage metrics to a custom backend by providing an instance of a LineageBackend in the config:

.. code-block:: ini

[lineage]
backend = my.lineage.CustomBackend

The backend should inherit from ``airflow.lineage.LineageBackend``.

.. code-block:: python

from airflow.lineage.backend import LineageBackend


class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service
18 changes: 18 additions & 0 deletions airflow-core/newsfragments/48388.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Task-level auto lineage collection is removed

The ``prepare_lineage``, ``apply_lineage`` mechanism, along with the custom
lineage backend type that supports it, has been removed. This has been an
experimental feature that never caught on.

The ``airflow.lineage.hook`` submodule is not affected.

* Types of change

* [x] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes
141 changes: 0 additions & 141 deletions airflow-core/src/airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,144 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Provides lineage support functions."""

from __future__ import annotations

import logging
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast

from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend
from airflow.utils.session import create_session

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context

PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"
AUTO = "auto"

log = logging.getLogger(__name__)


def get_backend() -> LineageBackend | None:
"""Get the lineage backend if defined in the configs."""
clazz = conf.getimport("lineage", "backend", fallback=None)

if clazz:
if not issubclass(clazz, LineageBackend):
raise TypeError(
f"Your custom Lineage class `{clazz.__name__}` "
f"is not a subclass of `{LineageBackend.__name__}`."
)
else:
return clazz()

return None


def _render_object(obj: Any, context: Context) -> dict:
ti = context["ti"]
if TYPE_CHECKING:
assert ti.task
return ti.task.render_template(obj, context)


T = TypeVar("T", bound=Callable)


def apply_lineage(func: T) -> T:
"""
Conditionally send lineage to the backend.

Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
_backend = get_backend()

@wraps(func)
def wrapper(self, context, *args, **kwargs):
self.log.debug("Lineage called with inlets: %s, outlets: %s", self.inlets, self.outlets)

ret_val = func(self, context, *args, **kwargs)

outlets = list(self.outlets)
inlets = list(self.inlets)

if outlets:
self.xcom_push(context, key=PIPELINE_OUTLETS, value=outlets)

if inlets:
self.xcom_push(context, key=PIPELINE_INLETS, value=inlets)

if _backend:
_backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)

return ret_val

return cast("T", wrapper)


def prepare_lineage(func: T) -> T:
"""
Prepare the lineage inlets and outlets.

Inlets can be:

* "auto" -> picks up any outlets from direct upstream tasks that have outlets defined, as such that
if A -> B -> C and B does not have outlets but A does, these are provided as inlets.
* "list of task_ids" -> picks up outlets from the upstream task_ids
* "list of datasets" -> manually defined list of dataset

"""

@wraps(func)
def wrapper(self, context, *args, **kwargs):
from airflow.models.abstractoperator import AbstractOperator

self.log.debug("Preparing lineage inlets and outlets")

if isinstance(self.inlets, (str, AbstractOperator)):
self.inlets = [self.inlets]

if self.inlets and isinstance(self.inlets, list):
# get task_ids that are specified as parameter and make sure they are upstream
task_ids = {o for o in self.inlets if isinstance(o, str)}.union(
op.task_id for op in self.inlets if isinstance(op, AbstractOperator)
).intersection(self.get_flat_relative_ids(upstream=True))

# pick up unique direct upstream task_ids if AUTO is specified
if AUTO.upper() in self.inlets or AUTO.lower() in self.inlets:
task_ids = task_ids.union(task_ids.symmetric_difference(self.upstream_task_ids))

# Remove auto and task_ids
self.inlets = [i for i in self.inlets if not isinstance(i, str)]

# We manually create a session here since xcom_pull returns a
# LazySelectSequence proxy. If we do not pass a session, a new one
# will be created, but that session will not be properly closed.
# After we are done iterating, we can safely close this session.
with create_session() as session:
_inlets = self.xcom_pull(
context, task_ids=task_ids, dag_id=self.dag_id, key=PIPELINE_OUTLETS, session=session
)
self.inlets.extend(i for it in _inlets for i in it)

elif self.inlets:
raise AttributeError("inlets is not a list, operator, string or attr annotated object")

if not isinstance(self.outlets, list):
self.outlets = [self.outlets]

# render inlets and outlets
self.inlets = [_render_object(i, context) for i in self.inlets]

self.outlets = [_render_object(i, context) for i in self.outlets]

self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets)

return func(self, context, *args, **kwargs)

return cast("T", wrapper)
46 changes: 0 additions & 46 deletions airflow-core/src/airflow/lineage/backend.py

This file was deleted.

17 changes: 11 additions & 6 deletions airflow-core/src/airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound

from airflow.exceptions import (
AirflowException,
)
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.exceptions import AirflowException

# Keeping this file at all is a temp thing as we migrate the repo to the task sdk as the base, but to keep
# main working and useful for others to develop against we use the TaskSDK here but keep this file around
Expand Down Expand Up @@ -372,7 +369,6 @@ def get_outlet_defs(self):
extended/overridden by subclasses.
"""

@prepare_lineage
def pre_execute(self, context: Any):
"""Execute right before self.execute() is called."""
if self._pre_execute_hook is None:
Expand All @@ -386,7 +382,16 @@ def pre_execute(self, context: Any):
logger=self.log,
).run(context)

@apply_lineage
def execute(self, context: Context) -> Any:
"""
Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.
"""
raise NotImplementedError()

def post_execute(self, context: Any, result: Any = None):
"""
Execute right after self.execute() is called.
Expand Down
Loading