Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to add custom facet in OpenLineage events #38982

Merged
merged 22 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d4675b2
Ability to add custom facet in OpenLineage events
Apr 12, 2024
8f80f81
Update airflow/providers/openlineage/provider.yaml
anandhimurali Apr 15, 2024
9369d3b
Update airflow/providers/openlineage/provider.yaml
anandhimurali May 8, 2024
ca0224c
Adding None type hint for the custom facet function
May 15, 2024
5597404
Fix a test after rebase
May 16, 2024
457584e
Removed the legacy OPENLINEAGE_ configs format for OPENLINEAGE_CUSTOM…
May 16, 2024
d246f3a
Duplicate facet key check
May 16, 2024
5465664
Update airflow/providers/openlineage/utils/utils.py
anandhimurali May 21, 2024
16fe262
Update airflow/providers/openlineage/utils/utils.py
anandhimurali May 21, 2024
1d9b1f5
Fixes after rebase
Jun 20, 2024
9847bd6
Adding user docs for custom_facet_functions
Jul 2, 2024
f268e9d
Rename custom_facet_functions as custom_run_facets
Jul 3, 2024
bfffb9e
Increment version for custom_run_facets feature
Jul 5, 2024
531f996
Enrich example with access to operator and return value as None.
Jul 5, 2024
9b6f0c8
Add try-except for custom facet function execution
Jul 5, 2024
88f8c07
Fix the typing for the custom facet fucntion return type
Jul 5, 2024
ead05fd
Documentation: funcs are executed only for START events
Jul 5, 2024
6505f4f
Fix the typing for the custom facet function return type
Jul 5, 2024
497bdba
Fixes after pre-commit hook checks
Jul 18, 2024
8c404b9
Adding start_date to test DAGs for 2.7 compatibility tests
Jul 18, 2024
18526d7
Removing a out of scope __init__ file added by pre-commit check
Jul 19, 2024
75c10d1
Merge branch 'main' into ol-custom-facet
anandhimurali Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def custom_extractors() -> set[str]:
return set(extractor.strip() for extractor in option.split(";") if extractor.strip())


@cache
def custom_run_facets() -> set[str]:
"""[openlineage] custom_run_facets."""
option = conf.get(_CONFIG_SECTION, "custom_run_facets", fallback="")
return set(
custom_facet_function.strip()
for custom_facet_function in option.split(";")
if custom_facet_function.strip()
)


@cache
def namespace() -> str:
"""[openlineage] namespace."""
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ config:
example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
default: ~
version_added: ~
custom_run_facets:
description: |
Register custom run facet functions by passing a string of semicolon separated full import paths.
type: string
example: full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
default: ''
version_added: 1.10.0
config_path:
description: |
Specify the path to the YAML configuration file.
Expand Down
38 changes: 37 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from contextlib import redirect_stdout, suppress
from functools import wraps
from io import StringIO
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Callable, Iterable

import attrs
from deprecated import deprecated
Expand Down Expand Up @@ -79,11 +79,47 @@ def get_job_name(task: TaskInstance) -> str:


def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, Any]:
from airflow.providers.openlineage.extractors.manager import try_import_from_string

custom_facets = {}
# check for -1 comes from SmartSensor compatibility with dynamic task mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance, "map_index") != -1:
custom_facets["airflow_mappedTask"] = AirflowMappedTaskRunFacet.from_task_instance(task_instance)

# Append custom run facets by executing the custom_run_facet functions.
for custom_facet_func in conf.custom_run_facets():
try:
func: Callable[[Any], dict] | None = try_import_from_string(custom_facet_func)
if not func:
log.warning(
"OpenLineage is unable to import custom facet function `%s`; will ignore it.",
custom_facet_func,
)
continue
facet: dict[str, dict[Any, Any]] | None = func(task_instance)
if facet and isinstance(facet, dict):
duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets]
if duplicate_facet_keys:
log.warning(
"Duplicate OpenLineage custom facets key(s) found: `%s` from function `%s`; "
"this will overwrite the previous value.",
", ".join(duplicate_facet_keys),
custom_facet_func,
)
log.debug(
"Adding OpenLineage custom facet with key(s): `%s` from function `%s`.",
tuple(facet),
custom_facet_func,
)
custom_facets.update(facet)
except Exception as exc:
log.warning(
"Error processing custom facet function `%s`; will ignore it. Error was: %s: %s",
custom_facet_func,
type(exc).__name__,
exc,
)
return custom_facets


Expand Down
89 changes: 87 additions & 2 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,100 @@ Conversion from Airflow Table entity to OpenLineage Dataset is made in the follo

.. _custom_facets:openlineage:

Custom facets
Custom Facets
=============
To learn more about facets in OpenLineage, please refer to `facet documentation <https://openlineage.io/docs/spec/facets/>`_.
Also check out `available Facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
Also check out `available facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_

The OpenLineage spec might not contain all the facets you need to write your extractor,
in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_.
More on creating custom facets can be found `here <https://openlineage.io/blog/extending-with-facets/>`_.

Custom Run Facets
=================

You can inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration.

Steps to be taken,

1. Write a function that returns the custom facet. You can write as many custom facet functions as needed.
2. Register the functions using the ``custom_run_facets`` Airflow configuration.

Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation
and append their return values to the run facet in the lineage event.

Writing a custom facet function
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Input arguments:** The function should accept the ``TaskInstance`` as an input argument.
- **Function body:** Perform the logic needed to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet.
- **Return value:** The custom facet to be added to the lineage event. Return type should be ``dict[str, dict]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria.

**Example custom facet function**

.. code-block:: python

import attrs
from airflow.models import TaskInstance
from openlineage.client.facet import BaseFacet


@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
"""Define a custom facet."""

name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str


def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | None:
operator_name = task_instance.task.operator_name
if operator_name == "BashOperator":
return
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}

Register the custom facet functions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Use the ``custom_run_facets`` Airflow configuration to register the custom run facet functions by passing
a string of semicolon separated full import path to the functions.

.. code-block:: ini

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent.

.. code-block:: ini

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

.. note::

- The custom facet functions are only executed at the start of the TaskInstance and added to the OpenLineage START event.
- Duplicate functions if registered, will be executed only once.
- When duplicate custom facet keys are returned by different functions, the last processed function will be added to the lineage event.

.. _job_hierarchy:openlineage:

Job Hierarchy
Expand Down
20 changes: 19 additions & 1 deletion docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ serializing only a few known attributes, we exclude certain non-serializable ele
Custom Extractors
^^^^^^^^^^^^^^^^^

If you use :ref:`custom Extractors <custom_extractors:openlineage>` feature, register the extractors by passing
To use :ref:`custom Extractors <custom_extractors:openlineage>` feature, register the extractors by passing
a string of semicolon separated Airflow Operators full import paths to ``extractors`` option in Airflow configuration.

.. code-block:: ini
Expand All @@ -286,6 +286,24 @@ a string of semicolon separated Airflow Operators full import paths to ``extract

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

Custom Run Facets
^^^^^^^^^^^^^^^^^

To inject :ref:`custom run facets <custom_facets:openlineage>`, register the custom run facet functions by passing
a string of semicolon separated full import paths to ``custom_run_facets`` option in Airflow configuration.

.. code-block:: ini

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent.

.. code-block:: ini

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_is_true,
config_path,
custom_extractors,
custom_run_facets,
dag_state_change_process_pool_size,
disabled_operators,
execution_timeout,
Expand All @@ -41,6 +42,7 @@
_CONFIG_SECTION = "openlineage"
_VAR_CONFIG_PATH = "OPENLINEAGE_CONFIG"
_CONFIG_OPTION_CONFIG_PATH = "config_path"
_CONFIG_OPTION_CUSTOM_RUN_FACETS = "custom_run_facets"
_VAR_DISABLE_SOURCE_CODE = "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE"
_CONFIG_OPTION_DISABLE_SOURCE_CODE = "disable_source_code"
_CONFIG_OPTION_DISABLED_FOR_OPERATORS = "disabled_for_operators"
Expand Down Expand Up @@ -255,6 +257,30 @@ def test_extractors_do_not_fail_if_conf_option_missing():
assert custom_extractors() == set()


@conf_vars(dict())
def test_custom_run_facets_not_set():
assert custom_run_facets() == set()


def test_custom_run_facets_with_no_values():
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): None}):
assert custom_run_facets() == set()
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): ""}):
assert custom_run_facets() == set()


@conf_vars(
{
(
_CONFIG_SECTION,
_CONFIG_OPTION_CUSTOM_RUN_FACETS,
): " tests.my_function;; tests.my_function ; my_function_2; ",
}
)
def test_custom_run_facets():
assert custom_run_facets() == {"tests.my_function", "my_function_2"}


@env_vars({_VAR_NAMESPACE: "my_custom_namespace"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_NAMESPACE): None})
def test_namespace_legacy_env_var_is_used_when_no_conf_option_set():
Expand Down
87 changes: 87 additions & 0 deletions tests/providers/openlineage/utils/custom_facet_fixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

import attrs
from openlineage.client.facet import BaseFacet

if TYPE_CHECKING:
from airflow.models import TaskInstance


@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
"""Define a custom run facet."""

name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str


def get_additional_test_facet(task_instance: TaskInstance) -> dict[str, dict] | None:
operator_name = task_instance.task.operator_name if task_instance.task else None
if operator_name == "BashOperator":
return None
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}


def get_duplicate_test_facet_key(task_instance: TaskInstance):
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}


def get_another_test_facet(task_instance: TaskInstance):
return {"another_run_facet": {"name": "another-lineage-namespace"}}


def return_type_is_not_dict(task_instance: TaskInstance):
return "return type is not dict"


def get_custom_facet_throws_exception(task_instance: TaskInstance):
raise Exception("fake exception from custom fcet function")
Loading