Skip to content

Comments

Move lineage from airflow core to task sdk#61157

Merged
amoghrajesh merged 3 commits intoapache:mainfrom
astronomer:lineage-again
Jan 28, 2026
Merged

Move lineage from airflow core to task sdk#61157
amoghrajesh merged 3 commits intoapache:mainfrom
astronomer:lineage-again

Conversation

@amoghrajesh
Copy link
Contributor

Reattempting: #60968 after it was reverted due to failing tests in https://github.com/apache/airflow/actions/runs/21411369808/job/61650388211

The second commit makes the changes needed to fix the issues that lead to failures.

Why?

Lineage collection is a task execution concern and on checking it only runs on workers (task sdk consumer) processes, not in any server components (scheduler, api server). I intend to move lineage module from airflow-core to task-sdk as part of the ongoing client server separation work.

Some more context:

  • io/path.py intercepts file I/O during task execution
  • OpenLineage listeners run in the worker process after task completion
  • It only orchestrates, never executes user code

What is done?

  • Created a new module in sdk: sdk/lineage.py
  • Moved all classes from airflow.lineage.hook
    • Updated imports to SDK equivalents:
      • ProvidersManagerProvidersManagerTaskRuntime
      • airflow.utils.log.logging_mixinairflow.sdk.definitions._internal.logging_mixin
    • Created get_hook_lineage_readers_plugins() using SDK's plugin discovery

Backward Compatibility

  1. For core -

    • Supports both from airflow.lineage.hook import X and from airflow.lineage import hook
  2. Provider compatibility has been handled with providers/common/compat/src/airflow/providers/common/compat/sdk.py

    • Added lineage classes to compat layer
    • Handles DatasetLineageInfoAssetLineageInfo rename (AF2 → AF3)
  3. Removed from core -airflow-core/src/airflow/plugins_manager.py

    • Deleted unused get_hook_lineage_readers_plugins() function
    • Function only existed for core's old lineage implementation
  4. For provider developers, it is recommended to use imports from airflow.providers.common.compat.sdk

Testing

To gain confidence I tried to test a manual e2e scenario for this. Ran breeze with OL integration:
breeze start-airflow --integration openlineage

DAG:

from __future__ import annotations

from datetime import datetime

from airflow.sdk import DAG
from airflow.hooks.base import BaseHook
from airflow.providers.standard.operators.python import PythonOperator


from airflow.sdk.lineage import get_hook_lineage_collector


class SimpleHook(BaseHook):

    def process(self):
        collector = get_hook_lineage_collector()

        collector.add_input_asset(self, uri="file:///input/data.csv")

        collector.add_output_asset(self, uri="file:///output/result.csv")

        print("Lineage reported! "*10)


def my_task():
    hook = SimpleHook()
    hook.process()


with DAG(
    dag_id="simple_lineage",
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    PythonOperator(
        task_id="run_hook",
        python_callable=my_task,
    )

Its a simple dag that does this:

  • Creates a custom hook (SimpleHook) that reports lineage information
  • Reports input dataset: file:///input/data.csv
  • Reports output dataset: file:///output/result.csv
  • Sends lineage to OpenLineage by using get_hook_lineage_collector() to register assets

DAG run:
image

Marquez:
image

image image

@amoghrajesh amoghrajesh added this to the Airflow 3.2.0 milestone Jan 28, 2026
@amoghrajesh amoghrajesh self-assigned this Jan 28, 2026
@amoghrajesh amoghrajesh added full tests needed We need to run full set of tests for this PR to merge all versions If set, the CI build will be forced to use all versions of Python/K8S/DBs labels Jan 28, 2026
@amoghrajesh amoghrajesh merged commit e72f3a1 into apache:main Jan 28, 2026
228 checks passed
@amoghrajesh amoghrajesh deleted the lineage-again branch January 28, 2026 10:14
shreyas-dev pushed a commit to shreyas-dev/airflow that referenced this pull request Jan 29, 2026
shashbha14 pushed a commit to shashbha14/airflow that referenced this pull request Feb 2, 2026
jason810496 pushed a commit to abhijeets25012-tech/airflow that referenced this pull request Feb 3, 2026
jhgoebbert pushed a commit to jhgoebbert/airflow_Owen-CH-Leung that referenced this pull request Feb 8, 2026
@amoghrajesh
Copy link
Contributor Author

cc: @atul-astronomer we can test out if using lineage works fine from new path

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

all versions If set, the CI build will be forced to use all versions of Python/K8S/DBs area:dev-tools area:lineage area:plugins area:providers area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch full tests needed We need to run full set of tests for this PR to merge provider:common-compat provider:openlineage AIP-53

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants