Skip to content

Latest commit

 

History

History
447 lines (275 loc) · 16.9 KB

File metadata and controls

447 lines (275 loc) · 16.9 KB

Using OpenLineage integration

OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata. Check out OpenLineage docs.

No change to user DAG files is required to use OpenLineage. Basic configuration is needed so that OpenLineage knows where to send events.

Quickstart

Note

OpenLineage Provider offers a diverse range of data transport options (http, kafka, file etc.), including the flexibility to create a custom solution. Configuration can be managed through several approaches and there is an extensive array of settings available for users to fine-tune and enhance their use of OpenLineage. For a comprehensive explanation of these features, please refer to the subsequent sections of this document.

This example is a basic demonstration of OpenLineage setup.

  1. Install provider package or add it to requirements.txt file.

    pip install apache-airflow-providers-openlineage
  2. Provide a Transport configuration so that OpenLineage knows where to send the events. Within airflow.cfg file

    [openlineage]
    transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

    or with AIRFLOW__OPENLINEAGE__TRANSPORT environment variable

    AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
  3. That's it ! OpenLineage events should be sent to the configured backend when DAGs are run.

Usage

When enabled and configured, the integration requires no further action from the user. It will automatically:

  • Collect task input / output metadata (source, schema, etc.).
  • Collect task run-level metadata (execution time, state, parameters, etc.)
  • Collect task job-level metadata (owners, type, description, etc.)
  • Collect task-specific metadata (bigquery job id, python source code, etc.) - depending on the Operator

All this data will be sent as OpenLineage events to the configured backend as described in :ref:`job_hierarchy:openlineage`.

Transport setup

Primary, and recommended method of configuring OpenLineage Airflow Provider is Airflow configuration (airflow.cfg file). All possible configuration options, with example values, can be found in :ref:`the configuration section <configuration:openlineage>`.

At minimum, one thing that needs to be set up in every case is Transport - where do you wish for your events to end up - for example Marquez.

Transport as JSON string

The transport option in Airflow configuration is used for that purpose.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

AIRFLOW__OPENLINEAGE__TRANSPORT environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'

If you want to look at OpenLineage events without sending them anywhere, you can set up ConsoleTransport - the events will end up in task logs.

[openlineage]
transport = {"type": "console"}

Note

For full list of built-in transport types, specific transport's options or instructions on how to implement your custom transport, refer to Python client documentation.

Transport as config file

You can also configure OpenLineage Transport using a YAML file (f.e. openlineage.yml). Provide the path to the YAML file as config_path option in Airflow configuration.

[openlineage]
config_path = '/path/to/openlineage.yml'

AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'

Example content of config YAML file:

transport:
  type: http
  url: https://backend:5000
  endpoint: events/receive
  auth:
    type: api_key
    apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e

Note

Detailed description of that configuration method, together with example config files, can be found in Python client documentation.

Configuration precedence

As there are multiple possible ways of configuring OpenLineage, it's important to keep in mind the precedence of different configurations. OpenLineage Airflow Provider looks for the configuration in the following order:

  1. Check config_path in airflow.cfg under openlineage section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
  2. Check transport in airflow.cfg under openlineage section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
  3. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in this documentation. Please note that using Airflow configuration is encouraged and is the only future proof solution.

Backwards compatibility

Warning

Below variables should not be used and can be removed in the future. Consider using Airflow configuration (described above) for a future proof solution.

For backwards compatibility with openlineage-airflow package, some environment variables are still available:

  • OPENLINEAGE_DISABLED is an equivalent of AIRFLOW__OPENLINEAGE__DISABLED.
  • OPENLINEAGE_CONFIG is an equivalent of AIRFLOW__OPENLINEAGE__CONFIG_PATH.
  • OPENLINEAGE_NAMESPACE is an equivalent of AIRFLOW__OPENLINEAGE__NAMESPACE.
  • OPENLINEAGE_EXTRACTORS is an equivalent of setting AIRFLOW__OPENLINEAGE__EXTRACTORS.
  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE is an equivalent of AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE.
  • OPENLINEAGE_URL can be used to set up simple http transport. This method has some limitations and may require using other environment variables to achieve desired output. See docs.

Additional Options

Namespace

It's very useful to set up OpenLineage namespace for this particular instance. That way, if you use multiple OpenLineage producers, events coming from them will be logically separated. If not set, it's using default namespace. Provide the name of the namespace as namespace option in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'

AIRFLOW__OPENLINEAGE__NAMESPACE environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

Timeout

To add a layer of isolation between task execution and OpenLineage, adding a level of assurance that OpenLineage execution does not interfere with task execution in a way other than taking time, OpenLineage methods run in separate process. The code runs with default timeout of 10 seconds. You can increase this by setting the execution_timeout value.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60

Disable

You can disable sending OpenLineage events without uninstalling OpenLineage provider by setting disabled option to true in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true

AIRFLOW__OPENLINEAGE__DISABLED environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__DISABLED=true

Disable source code

Several Operators (f.e. Python, Bash) will by default include their source code in their OpenLineage events. To prevent that, set disable_source_code option to true in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true

Disabled for Operators

You can easily exclude some Operators from emitting OpenLineage events by passing a string of semicolon separated full import paths of Airflow Operators to disable as disabled_for_operators field in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

Full Task Info

By default, OpenLineage integration's AirflowRunFacet - attached on START event for every task instance event - does not contain full serialized task information (parameters to given operator), but only includes select parameters.

However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than serializing only a few known attributes, we exclude certain non-serializable elements and send everything else.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true

Warning

By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task.

Custom Extractors

To use :ref:`custom Extractors <custom_extractors:openlineage>` feature, register the extractors by passing a string of semicolon separated Airflow Operators full import paths to extractors option in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

Custom Run Facets

To inject :ref:`custom run facets <custom_facets:openlineage>`, register the custom run facet functions by passing a string of semicolon separated full import paths to custom_run_facets option in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

Debug Mode

You can enable sending additional information in OpenLineage events that can be useful for debugging and reproducing your environment setup by setting debug_mode option to true in Airflow configuration.

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true

AIRFLOW__OPENLINEAGE__DEBUG_MODE environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__DEBUG_MODE=true

Warning

By setting this variable to true, OpenLineage integration may log and emit extensive details. It should only be enabled temporary for debugging purposes.

Enabling OpenLineage on DAG/task level

One can selectively enable OpenLineage for specific DAGs and tasks by using the selective_enable policy. To enable this policy, set the selective_enable option to True in the [openlineage] section of your Airflow configuration file:

[openlineage]
selective_enable = True

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE environment variable is an equivalent.

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true

While selective_enable enables selective control, the disabled :ref:`option <options:disable>` still has precedence. If you set disabled to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the selective_enable setting.

Once the selective_enable policy is enabled, you can choose to enable OpenLineage for individual DAGs and tasks using the enable_lineage and disable_lineage functions.

  1. Enabling Lineage on a DAG:
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
    # Tasks within this DAG will have lineage tracking enabled
    MyOperator(...)

    AnotherOperator(...)
  1. Enabling Lineage on a Task:

While enabling lineage on a DAG implicitly enables it for all tasks within that DAG, you can still selectively disable it for specific tasks:

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
    t1 = MyOperator(...)
    t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

Enabling lineage on the DAG level automatically enables it for all tasks within that DAG unless explicitly disabled per task.

Enabling lineage on the task level implicitly enables lineage on its DAG. This is because each emitting task sends a ParentRunFacet, which requires the DAG-level lineage to be enabled in some OpenLineage backend systems. Disabling DAG-level lineage while enabling task-level lineage might cause errors or inconsistencies.

Troubleshooting

See :ref:`troubleshooting:openlineage` for details on how to troubleshoot OpenLineage.

Adding support for custom Operators

If you want to add OpenLineage coverage for particular Operator, take a look at :ref:`guides/developer:openlineage`

Where can I learn more?

Feedback

You can reach out to us on slack and leave us feedback!

How to contribute

We welcome your contributions! OpenLineage is an Open Source project under active development, and we'd love your help!

Sounds fun? Check out our new contributor guide to get started.