diff --git a/airflow-core/docs/administration-and-deployment/lineage.rst b/airflow-core/docs/administration-and-deployment/lineage.rst index 0219d7b744472..c914e0aa5b291 100644 --- a/airflow-core/docs/administration-and-deployment/lineage.rst +++ b/airflow-core/docs/administration-and-deployment/lineage.rst @@ -22,76 +22,6 @@ Lineage .. note:: Lineage support is very experimental and subject to change. -Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having -audit trails and data governance, but also debugging of data flows. - -Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it -works. - -.. code-block:: python - - import datetime - import pendulum - - from airflow.lineage import AUTO - from airflow.models import DAG - from airflow.providers.common.compat.lineage.entities import File - from airflow.providers.standard.operators.bash import BashOperator - from airflow.providers.standard.operators.empty import EmptyOperator - - FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] - - dag = DAG( - dag_id="example_lineage", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - schedule="0 0 * * *", - catchup=False, - dagrun_timeout=datetime.timedelta(minutes=60), - ) - - f_final = File(url="/tmp/final") - run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final) - - f_in = File(url="/tmp/whole_directory/") - outlets = [] - for file in FILE_CATEGORIES: - f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file)) - outlets.append(f_out) - - run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets) - run_this.set_downstream(run_this_last) - -Inlets can be a (list of) upstream task ids or statically defined as an attr annotated object -as is, for example, the ``File`` object. Outlets can only be attr annotated object. Both are rendered -at run time. However, the outlets of a task in case they are inlets to another task will not be re-rendered -for the downstream task. - -.. note:: Operators can add inlets and outlets automatically if the operator supports it. - -In the example DAG task ``run_this`` (``task_id=run_me_first``) is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are -generated from a list. Note that ``data_interval_start`` is a templated field and will be rendered when the task is running. - -.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the ``pre_execute`` method of a task. When the task - has finished execution ``post_execute`` is called and lineage metadata is pushed into XCOM. Thus if you are creating - your own operators that override this method make sure to decorate your method with ``prepare_lineage`` and ``apply_lineage`` - respectively. - -Shorthand notation ------------------- - -Shorthand notation is available as well, this works almost equal to unix command line pipes, inputs and outputs. -Note that operator precedence_ still applies. Also the ``|`` operator will only work when the left hand side either -has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box support of lineage ``operator.supports_lineage == True``. - -.. code-block:: python - - f_in > run_this | (run_this_last > outlets) - -.. _precedence: https://docs.python.org/3/reference/expressions.html - -Hook Lineage ------------- - Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks. This functionality helps you understand how data flows throughout your Airflow pipelines. @@ -101,7 +31,7 @@ The collector then uses this data to construct AIP-60 compliant Assets, a standa .. code-block:: python - from airflow.lineage.hook.lineage import get_hook_lineage_collector + from airflow.lineage.hook import get_hook_lineage_collector class CustomHook(BaseHook): @@ -131,26 +61,3 @@ which is registered in an Airflow plugin. If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead. This collector does not create AIP-60 compliant assets or collect lineage information. - - -Lineage Backend ---------------- - -It's possible to push the lineage metrics to a custom backend by providing an instance of a LineageBackend in the config: - -.. code-block:: ini - - [lineage] - backend = my.lineage.CustomBackend - -The backend should inherit from ``airflow.lineage.LineageBackend``. - -.. code-block:: python - - from airflow.lineage.backend import LineageBackend - - - class CustomBackend(LineageBackend): - def send_lineage(self, operator, inlets=None, outlets=None, context=None): - ... - # Send the info to some external service diff --git a/airflow-core/newsfragments/48388.significant.rst b/airflow-core/newsfragments/48388.significant.rst new file mode 100644 index 0000000000000..cc60bb7686210 --- /dev/null +++ b/airflow-core/newsfragments/48388.significant.rst @@ -0,0 +1,18 @@ +Task-level auto lineage collection is removed + +The ``prepare_lineage``, ``apply_lineage`` mechanism, along with the custom +lineage backend type that supports it, has been removed. This has been an +experimental feature that never caught on. + +The ``airflow.lineage.hook`` submodule is not affected. + +* Types of change + + * [x] Dag changes + * [ ] Config changes + * [ ] API changes + * [ ] CLI changes + * [ ] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [x] Code interface changes diff --git a/airflow-core/src/airflow/lineage/__init__.py b/airflow-core/src/airflow/lineage/__init__.py index 2fedfbd57a94f..217e5db960782 100644 --- a/airflow-core/src/airflow/lineage/__init__.py +++ b/airflow-core/src/airflow/lineage/__init__.py @@ -15,144 +15,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Provides lineage support functions.""" - -from __future__ import annotations - -import logging -from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast - -from airflow.configuration import conf -from airflow.lineage.backend import LineageBackend -from airflow.utils.session import create_session - -if TYPE_CHECKING: - from airflow.sdk.definitions.context import Context - -PIPELINE_OUTLETS = "pipeline_outlets" -PIPELINE_INLETS = "pipeline_inlets" -AUTO = "auto" - -log = logging.getLogger(__name__) - - -def get_backend() -> LineageBackend | None: - """Get the lineage backend if defined in the configs.""" - clazz = conf.getimport("lineage", "backend", fallback=None) - - if clazz: - if not issubclass(clazz, LineageBackend): - raise TypeError( - f"Your custom Lineage class `{clazz.__name__}` " - f"is not a subclass of `{LineageBackend.__name__}`." - ) - else: - return clazz() - - return None - - -def _render_object(obj: Any, context: Context) -> dict: - ti = context["ti"] - if TYPE_CHECKING: - assert ti.task - return ti.task.render_template(obj, context) - - -T = TypeVar("T", bound=Callable) - - -def apply_lineage(func: T) -> T: - """ - Conditionally send lineage to the backend. - - Saves the lineage to XCom and if configured to do so sends it - to the backend. - """ - _backend = get_backend() - - @wraps(func) - def wrapper(self, context, *args, **kwargs): - self.log.debug("Lineage called with inlets: %s, outlets: %s", self.inlets, self.outlets) - - ret_val = func(self, context, *args, **kwargs) - - outlets = list(self.outlets) - inlets = list(self.inlets) - - if outlets: - self.xcom_push(context, key=PIPELINE_OUTLETS, value=outlets) - - if inlets: - self.xcom_push(context, key=PIPELINE_INLETS, value=inlets) - - if _backend: - _backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context) - - return ret_val - - return cast("T", wrapper) - - -def prepare_lineage(func: T) -> T: - """ - Prepare the lineage inlets and outlets. - - Inlets can be: - - * "auto" -> picks up any outlets from direct upstream tasks that have outlets defined, as such that - if A -> B -> C and B does not have outlets but A does, these are provided as inlets. - * "list of task_ids" -> picks up outlets from the upstream task_ids - * "list of datasets" -> manually defined list of dataset - - """ - - @wraps(func) - def wrapper(self, context, *args, **kwargs): - from airflow.models.abstractoperator import AbstractOperator - - self.log.debug("Preparing lineage inlets and outlets") - - if isinstance(self.inlets, (str, AbstractOperator)): - self.inlets = [self.inlets] - - if self.inlets and isinstance(self.inlets, list): - # get task_ids that are specified as parameter and make sure they are upstream - task_ids = {o for o in self.inlets if isinstance(o, str)}.union( - op.task_id for op in self.inlets if isinstance(op, AbstractOperator) - ).intersection(self.get_flat_relative_ids(upstream=True)) - - # pick up unique direct upstream task_ids if AUTO is specified - if AUTO.upper() in self.inlets or AUTO.lower() in self.inlets: - task_ids = task_ids.union(task_ids.symmetric_difference(self.upstream_task_ids)) - - # Remove auto and task_ids - self.inlets = [i for i in self.inlets if not isinstance(i, str)] - - # We manually create a session here since xcom_pull returns a - # LazySelectSequence proxy. If we do not pass a session, a new one - # will be created, but that session will not be properly closed. - # After we are done iterating, we can safely close this session. - with create_session() as session: - _inlets = self.xcom_pull( - context, task_ids=task_ids, dag_id=self.dag_id, key=PIPELINE_OUTLETS, session=session - ) - self.inlets.extend(i for it in _inlets for i in it) - - elif self.inlets: - raise AttributeError("inlets is not a list, operator, string or attr annotated object") - - if not isinstance(self.outlets, list): - self.outlets = [self.outlets] - - # render inlets and outlets - self.inlets = [_render_object(i, context) for i in self.inlets] - - self.outlets = [_render_object(i, context) for i in self.outlets] - - self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets) - - return func(self, context, *args, **kwargs) - - return cast("T", wrapper) diff --git a/airflow-core/src/airflow/lineage/backend.py b/airflow-core/src/airflow/lineage/backend.py deleted file mode 100644 index 25d1bd4b07eb0..0000000000000 --- a/airflow-core/src/airflow/lineage/backend.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Sends lineage metadata to a backend.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from airflow.models.baseoperator import BaseOperator - - -class LineageBackend: - """Sends lineage metadata to a backend.""" - - def send_lineage( - self, - operator: BaseOperator, - inlets: list | None = None, - outlets: list | None = None, - context: dict | None = None, - ): - """ - Send lineage metadata to a backend. - - :param operator: the operator executing a transformation on the inlets and outlets - :param inlets: the inlets to this operator - :param outlets: the outlets from this operator - :param context: the current context of the task instance - """ - raise NotImplementedError() diff --git a/airflow-core/src/airflow/models/baseoperator.py b/airflow-core/src/airflow/models/baseoperator.py index f052030f8df7b..80234bd7cf9fc 100644 --- a/airflow-core/src/airflow/models/baseoperator.py +++ b/airflow-core/src/airflow/models/baseoperator.py @@ -35,10 +35,7 @@ from sqlalchemy import select from sqlalchemy.orm.exc import NoResultFound -from airflow.exceptions import ( - AirflowException, -) -from airflow.lineage import apply_lineage, prepare_lineage +from airflow.exceptions import AirflowException # Keeping this file at all is a temp thing as we migrate the repo to the task sdk as the base, but to keep # main working and useful for others to develop against we use the TaskSDK here but keep this file around @@ -372,7 +369,6 @@ def get_outlet_defs(self): extended/overridden by subclasses. """ - @prepare_lineage def pre_execute(self, context: Any): """Execute right before self.execute() is called.""" if self._pre_execute_hook is None: @@ -386,7 +382,16 @@ def pre_execute(self, context: Any): logger=self.log, ).run(context) - @apply_lineage + def execute(self, context: Context) -> Any: + """ + Derive when creating an operator. + + Context is the same dictionary used as when rendering jinja templates. + + Refer to get_template_context for more context. + """ + raise NotImplementedError() + def post_execute(self, context: Any, result: Any = None): """ Execute right after self.execute() is called. diff --git a/airflow-core/tests/unit/lineage/test_lineage.py b/airflow-core/tests/unit/lineage/test_lineage.py deleted file mode 100644 index a45ec783b3d27..0000000000000 --- a/airflow-core/tests/unit/lineage/test_lineage.py +++ /dev/null @@ -1,203 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest import mock - -import attr -import pytest - -from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage -from airflow.lineage.backend import LineageBackend -from airflow.models import TaskInstance as TI -from airflow.providers.common.compat.lineage.entities import File -from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.sdk.definitions.context import Context -from airflow.utils import timezone -from airflow.utils.types import DagRunType - -from tests_common.test_utils.config import conf_vars - -pytestmark = pytest.mark.db_test - - -DEFAULT_DATE = timezone.datetime(2016, 1, 1) - - -# helper -@attr.define -class A: - pass - - -class CustomLineageBackend(LineageBackend): - def send_lineage(self, operator, inlets=None, outlets=None, context=None): - pass - - -class TestLineage: - def test_lineage(self, dag_maker): - f1s = "/tmp/does_not_exist_1-{}" - f2s = "/tmp/does_not_exist_2-{}" - f3s = "/tmp/does_not_exist_3" - file1 = File(f1s.format("{{ ds }}")) - file2 = File(f2s.format("{{ ds }}")) - file3 = File(f3s) - - with dag_maker(dag_id="test_prepare_lineage", start_date=DEFAULT_DATE) as dag: - op1 = EmptyOperator( - task_id="leave1", - inlets=file1, - outlets=[ - file2, - ], - ) - op2 = EmptyOperator(task_id="leave2") - op3 = EmptyOperator(task_id="upstream_level_1", inlets=AUTO, outlets=file3) - op4 = EmptyOperator(task_id="upstream_level_2") - op5 = EmptyOperator(task_id="upstream_level_3", inlets=["leave1", "upstream_level_1"]) - - op1.set_downstream(op3) - op2.set_downstream(op3) - op3.set_downstream(op4) - op4.set_downstream(op5) - - dag.clear() - dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - - ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - ctx3 = Context({"ti": TI(task=op3, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - ctx5 = Context({"ti": TI(task=op5, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - - # prepare with manual inlets and outlets - op1.pre_execute(ctx1) - - assert len(op1.inlets) == 1 - assert op1.inlets[0].url == f1s.format(DEFAULT_DATE) - - assert len(op1.outlets) == 1 - assert op1.outlets[0].url == f2s.format(DEFAULT_DATE) - - # post process with no backend - op1.post_execute(ctx1) - - op2.pre_execute(ctx2) - assert len(op2.inlets) == 0 - op2.post_execute(ctx2) - - op3.pre_execute(ctx3) - assert len(op3.inlets) == 1 - assert isinstance(op3.inlets[0], File) - assert op3.inlets[0].url == f2s.format(DEFAULT_DATE) - assert op3.outlets[0] == file3 - op3.post_execute(ctx3) - - # skip 4 - - op5.pre_execute(ctx5) - # Task IDs should be removed from the inlets, replaced with the outlets of those tasks - assert sorted(op5.inlets) == [file2, file3] - op5.post_execute(ctx5) - - def test_lineage_render(self, dag_maker): - # tests inlets / outlets are rendered if they are added - # after initialization - with dag_maker(dag_id="test_lineage_render", start_date=DEFAULT_DATE): - op1 = EmptyOperator(task_id="task1") - dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - - f1s = "/tmp/does_not_exist_1-{}" - file1 = File(f1s.format("{{ ds }}")) - - op1.inlets.append(file1) - op1.outlets.append(file1) - - # logical_date is set in the context in order to avoid creating task instances - ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - - op1.pre_execute(ctx1) - assert op1.inlets[0].url == f1s.format(DEFAULT_DATE) - assert op1.outlets[0].url == f1s.format(DEFAULT_DATE) - - def test_attr_outlet(self, dag_maker): - a = A() - - f3s = "/tmp/does_not_exist_3" - file3 = File(f3s) - - with dag_maker(dag_id="test_prepare_lineage", start_date=DEFAULT_DATE): - op1 = EmptyOperator( - task_id="leave1", - outlets=[a, file3], - ) - op2 = EmptyOperator(task_id="leave2", inlets="auto") - - op1 >> op2 - - dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - - ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds": DEFAULT_DATE}) - - # prepare with manual inlets and outlets - op1.pre_execute(ctx1) - op1.post_execute(ctx1) - - op2.pre_execute(ctx2) - assert op2.inlets == [a, file3] - op2.post_execute(ctx2) - - @mock.patch("airflow.lineage.get_backend") - def test_lineage_is_sent_to_backend(self, mock_get_backend, dag_maker): - class TestBackend(LineageBackend): - def send_lineage(self, operator, inlets=None, outlets=None, context=None): - assert len(inlets) == 1 - assert len(outlets) == 1 - - func = mock.Mock() - func.__name__ = "foo" - - mock_get_backend.return_value = TestBackend() - - with dag_maker(dag_id="test_lineage_is_sent_to_backend", start_date=DEFAULT_DATE): - op1 = EmptyOperator(task_id="task1") - dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - - file1 = File("/tmp/some_file") - - op1.inlets.append(file1) - op1.outlets.append(file1) - - (ti,) = dag_run.task_instances - ctx1 = Context({"ti": ti, "ds": DEFAULT_DATE}) - - prep = prepare_lineage(func) - prep(op1, ctx1) - post = apply_lineage(func) - post(op1, ctx1) - - def test_empty_lineage_backend(self): - backend = get_backend() - assert backend is None - - @conf_vars({("lineage", "backend"): "unit.lineage.test_lineage.CustomLineageBackend"}) - def test_resolve_lineage_class(self): - backend = get_backend() - assert issubclass(backend.__class__, LineageBackend) - assert isinstance(backend, CustomLineageBackend) diff --git a/airflow-core/tests/unit/models/test_baseoperator.py b/airflow-core/tests/unit/models/test_baseoperator.py index c8b01e22c1a75..ea9d5162f5db9 100644 --- a/airflow-core/tests/unit/models/test_baseoperator.py +++ b/airflow-core/tests/unit/models/test_baseoperator.py @@ -20,6 +20,7 @@ import copy from collections import defaultdict from datetime import datetime +from unittest import mock import pytest @@ -32,7 +33,6 @@ from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.models.trigger import TriggerFailureReason -from airflow.providers.common.compat.lineage.entities import File from airflow.providers.common.sql.operators import sql from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule @@ -113,53 +113,21 @@ def test_baseoperator_with_task_id_less_than_250_chars(self): except Exception as e: pytest.fail(f"Exception raised: {e}") - def test_lineage_composition(self): - """ - Test composition with lineage - """ - inlet = File(url="in") - outlet = File(url="out") - dag = DAG("test-dag", schedule=None, start_date=DEFAULT_DATE) - task1 = BaseOperator(task_id="op1", dag=dag) - task2 = BaseOperator(task_id="op2", dag=dag) - - # mock - task1.supports_lineage = True - - # note: operator precedence still applies - inlet > task1 | (task2 > outlet) - - assert task1.get_inlet_defs() == [inlet] - assert task2.get_inlet_defs() == [task1.task_id] - assert task2.get_outlet_defs() == [outlet] - - fail = ClassWithCustomAttributes() - with pytest.raises(TypeError): - fail > task1 - with pytest.raises(TypeError): - task1 > fail - with pytest.raises(TypeError): - fail | task1 - with pytest.raises(TypeError): - task1 | fail - - task3 = BaseOperator(task_id="op3", dag=dag) - extra = File(url="extra") - [inlet, extra] > task3 - - assert task3.get_inlet_defs() == [inlet, extra] - - task1.supports_lineage = False - with pytest.raises(ValueError): - task1 | task3 - - assert task2.supports_lineage is False - task2 | task3 - assert len(task3.get_inlet_defs()) == 3 - - task4 = BaseOperator(task_id="op4", dag=dag) - task4 > [inlet, outlet, extra] - assert task4.get_outlet_defs() == [inlet, outlet, extra] + def test_pre_execute_hook(self): + hook = mock.MagicMock() + + op = BaseOperator(task_id="test_task", pre_execute=hook) + op_copy = op.prepare_for_execution() + op_copy.pre_execute({}) + assert hook.called + + def test_post_execute_hook(self): + hook = mock.MagicMock() + + op = BaseOperator(task_id="test_task", post_execute=hook) + op_copy = op.prepare_for_execution() + op_copy.post_execute({}) + assert hook.called def test_task_naive_datetime(self): naive_datetime = DEFAULT_DATE.replace(tzinfo=None) diff --git a/providers/openlineage/docs/guides/developer.rst b/providers/openlineage/docs/guides/developer.rst index b0722e1a248a5..0485e43048288 100644 --- a/providers/openlineage/docs/guides/developer.rst +++ b/providers/openlineage/docs/guides/developer.rst @@ -31,10 +31,6 @@ There might be some Operators that you can not modify (f.e. third party provider To handle this situation, OpenLineage allows you to provide custom Extractor for any Operator. See :ref:`custom_extractors:openlineage` for more details. -If all of the above can not be implemented, as a fallback, there is a way to manually annotate lineage. -Airflow allows Operators to track lineage by specifying the input and outputs of the Operators via inlets and outlets. -See :ref:`inlets_outlets:openlineage` for more details. - .. _extraction_precedence:openlineage: Extraction precedence @@ -360,99 +356,6 @@ For more examples of OpenLineage Extractors, check out the source code of `BashExtractor `_ or `PythonExtractor `_. -.. _inlets_outlets:openlineage: - -Manually annotated lineage -========================== - -This approach is rarely recommended, only in very specific cases, when it's impossible to extract some lineage information from the Operator itself. -If you want to extract lineage from your own Operators, you may prefer directly implementing OpenLineage methods as described in :ref:`openlineage_methods:openlineage`. -When dealing with Operators that you can not modify (f.e. third party providers), but still want the lineage to be extracted from them, see :ref:`custom_extractors:openlineage`. - -Airflow allows Operators to track lineage by specifying the input and outputs of the Operators via -`inlets and outlets `_. -OpenLineage will, by default, use inlets and outlets as input/output datasets if it cannot find any successful extraction from the OpenLineage methods or the Extractors. - -Airflow supports inlets and outlets to be either a Table, Column, File or User entity and so does OpenLineage. - -Example -^^^^^^^ - -An Operator inside the Airflow DAG can be annotated with inlets and outlets like in the below example: - -.. code-block:: python - - """Example DAG demonstrating the usage of the extraction via Inlets and Outlets.""" - - import pendulum - - from airflow import DAG - from airflow.providers.common.compat.lineage.entities import Table, File, Column, User - from airflow.providers.standard.operators.bash import BashOperator - - - t1 = Table( - cluster="c1", - database="d1", - name="t1", - owners=[User(email="jdoe@ok.com", first_name="Joe", last_name="Doe")], - ) - t2 = Table( - cluster="c1", - database="d1", - name="t2", - columns=[ - Column(name="col1", description="desc1", data_type="type1"), - Column(name="col2", description="desc2", data_type="type2"), - ], - owners=[ - User(email="mike@company.com", first_name="Mike", last_name="Smith"), - User(email="theo@company.com", first_name="Theo"), - User(email="smith@company.com", last_name="Smith"), - User(email="jane@company.com"), - ], - ) - t3 = Table( - cluster="c1", - database="d1", - name="t3", - columns=[ - Column(name="col3", description="desc3", data_type="type3"), - Column(name="col4", description="desc4", data_type="type4"), - ], - ) - t4 = Table(cluster="c1", database="d1", name="t4") - f1 = File(url="s3://bucket/dir/file1") - - - with DAG( - dag_id="example_operator", - schedule="@once", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - ) as dag: - task1 = BashOperator( - task_id="task_1_with_inlet_outlet", - bash_command='echo "{{ task_instance_key_str }}" && sleep 1', - inlets=[t1, t2], - outlets=[t3], - ) - - task2 = BashOperator( - task_id="task_2_with_inlet_outlet", - bash_command='echo "{{ task_instance_key_str }}" && sleep 1', - inlets=[t3, f1], - outlets=[t4], - ) - - task1 >> task2 - - if __name__ == "__main__": - dag.cli() - -Conversion from Airflow Table entity to OpenLineage Dataset is made in the following way: -- ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's Dataset -- The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where ``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity. - .. _custom_facets:openlineage: Custom Facets diff --git a/providers/papermill/src/airflow/providers/papermill/operators/papermill.py b/providers/papermill/src/airflow/providers/papermill/operators/papermill.py index 44f6db12e9c92..4dffe10aa5701 100644 --- a/providers/papermill/src/airflow/providers/papermill/operators/papermill.py +++ b/providers/papermill/src/airflow/providers/papermill/operators/papermill.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import subprocess from collections.abc import Collection, Sequence from functools import cached_property from typing import TYPE_CHECKING, ClassVar @@ -27,6 +26,7 @@ from airflow.models import BaseOperator from airflow.providers.common.compat.lineage.entities import File +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.papermill.hooks.kernel import REMOTE_KERNEL_ENGINE, KernelHook if TYPE_CHECKING: @@ -60,6 +60,7 @@ class PapermillOperator(BaseOperator): (ignores kernel name in the notebook document metadata) """ + # TODO: Remove this when provider drops 2.x support. supports_lineage = True template_fields: Sequence[str] = ( @@ -108,8 +109,9 @@ def execute(self, context: Context): self.input_nb = NoteBook(url=self.input_nb, parameters=self.parameters) # type: ignore[call-arg] if not isinstance(self.output_nb, NoteBook): self.output_nb = NoteBook(url=self.output_nb) # type: ignore[call-arg] - self.inlets.append(self.input_nb) - self.outlets.append(self.output_nb) + if not AIRFLOW_V_3_0_PLUS: + self.inlets.append(self.input_nb) + self.outlets.append(self.output_nb) remote_kernel_kwargs = {} kernel_hook = self.hook if kernel_hook: @@ -139,28 +141,7 @@ def execute(self, context: Context): **remote_kernel_kwargs, ) - # Convert the executed notebook to HTML using nbconvert - if self.nbconvert: - nbconvert_args = self.nbconvert_args or [] - if not isinstance(nbconvert_args, list): - raise ValueError("nbconvert_args must be a list") - - # Build the nbconvert command - command = [ - "jupyter", - "nbconvert", - "--to", - "html", - "--log-level", - "WARN", - self.output_nb.url, - ] + nbconvert_args - try: - subprocess.run(command, check=True) - self.log.info("Output HTML: %s", self.output_nb.url.replace(".ipynb", ".html")) - except subprocess.CalledProcessError as e: - self.log.error("nbconvert failed with output:\n%s", e.stdout) - raise + return self.output_nb @cached_property def hook(self) -> KernelHook | None: diff --git a/providers/papermill/tests/system/papermill/example_papermill_verify.py b/providers/papermill/tests/system/papermill/example_papermill_verify.py index 347eab2184e39..3e5648d9e0b59 100644 --- a/providers/papermill/tests/system/papermill/example_papermill_verify.py +++ b/providers/papermill/tests/system/papermill/example_papermill_verify.py @@ -30,7 +30,6 @@ from airflow import DAG from airflow.decorators import task -from airflow.lineage import AUTO from airflow.providers.papermill.operators.papermill import PapermillOperator START_DATE = datetime(2021, 1, 1) @@ -42,11 +41,11 @@ # [START howto_verify_operator_papermill] @task -def check_notebook(inlets, logical_date): +def check_notebook(output, logical_date): """ Verify the message in the notebook """ - notebook = sb.read_notebook(inlets[0].url) + notebook = sb.read_notebook(output.url) message = notebook.scraps["message"] print(f"Message in notebook {message} for {logical_date}") @@ -70,7 +69,7 @@ def check_notebook(inlets, logical_date): parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"}, ) - run_this >> check_notebook(inlets=AUTO, logical_date="{{ logical_date }}") + check_notebook(output=run_this.output, logical_date="{{ logical_date }}") # [END howto_verify_operator_papermill] from tests_common.test_utils.system_tests import get_test_run # noqa: E402 diff --git a/providers/papermill/tests/unit/papermill/operators/test_papermill.py b/providers/papermill/tests/unit/papermill/operators/test_papermill.py index 94d0b980ef641..82e75be90f6b0 100644 --- a/providers/papermill/tests/unit/papermill/operators/test_papermill.py +++ b/providers/papermill/tests/unit/papermill/operators/test_papermill.py @@ -88,10 +88,6 @@ def test_notebooks_objects( assert op.input_nb.url == TEST_INPUT_URL # type: ignore assert op.output_nb.url == TEST_OUTPUT_URL # type: ignore - # Test render Lineage inlets/outlets - assert op.inlets[0] == op.input_nb - assert op.outlets[0] == op.output_nb - @patch("airflow.providers.papermill.operators.papermill.pm") def test_execute(self, mock_papermill): in_nb = "/tmp/does_not_exist" diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index 87e0e79c61e13..6450560b34625 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -891,9 +891,6 @@ def say_hello_world(**context): "executor", } - # Defines if the operator supports lineage without manual definitions - supports_lineage: bool = False - # If True, the Rendered Template fields will be overwritten in DB after execution # This is useful for Taskflow decorators that modify the template fields during execution like # @task.bash decorator. @@ -1200,24 +1197,6 @@ def __hash__(self): hash_components.append(repr(val)) return hash(tuple(hash_components)) - # including lineage information - def __or__(self, other): - """ - Return [This Operator] | [Operator]. - - The inlets of other will be set to pick up the outlets from this operator. - Other will be set as a downstream task of this operator. - """ - if isinstance(other, BaseOperator): - if not self.outlets and not self.supports_lineage: - raise ValueError("No outlets defined for this operator") - other.add_inlets([self.task_id]) - self.set_downstream(other) - else: - raise TypeError(f"Right hand side ({other}) is not an Operator") - - return self - # /Composing Operators --------------------------------------------- def __gt__(self, other): diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index 9fed450ca7546..0b35bdffb308d 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -321,8 +321,6 @@ class MappedOperator(AbstractOperator): This should be a name to call ``getattr()`` on. """ - supports_lineage: bool = False - HIDE_ATTRS_FROM_UI: ClassVar[frozenset[str]] = AbstractOperator.HIDE_ATTRS_FROM_UI | frozenset( ("parse_time_mapped_ti_count", "operator_class", "start_trigger_args", "start_from_trigger") ) @@ -365,7 +363,6 @@ def get_serialized_fields(cls): "expand_input", # This is needed to be able to accept XComArg. "task_group", "upstream_task_ids", - "supports_lineage", "_is_setup", "_is_teardown", "_on_failure_fail_dagrun",