Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 149 additions & 31 deletions devel-common/src/sphinx_exts/providers_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import ast
import os
from collections.abc import Callable, Iterable
from functools import partial
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -72,13 +71,16 @@ def find_class_methods_with_specific_calls(

... def method4(self):
... self.some_other_method()

... def method5(self):
... direct_call()
... '''
> find_methods_with_specific_calls(
ast.parse(source_code),
{"airflow.my_method.not_ok", "airflow.my_method.ok"},
{"my_method": "airflow.my_method"}
{"airflow.my_method.not_ok", "airflow.my_method.ok", "airflow.direct_call"},
{"my_method": "airflow.my_method", "direct_call": "airflow.direct_call"}
)
{'method1', 'method2', 'method3'}
{'method1', 'method2', 'method3', 'method5'}
"""
method_call_map: dict[str, set[str]] = {}
methods_with_calls: set[str] = set()
Expand All @@ -92,6 +94,12 @@ def find_class_methods_with_specific_calls(
if not isinstance(sub_node, ast.Call):
continue
called_function = sub_node.func
# Direct function calls: e.g. send_sql_hook_lineage(...)
if isinstance(called_function, ast.Name):
full_call = import_mappings.get(called_function.id)
if full_call in target_calls:
methods_with_calls.add(node.name)
continue
if not isinstance(called_function, ast.Attribute):
continue
if isinstance(called_function.value, ast.Call) and isinstance(
Expand Down Expand Up @@ -149,18 +157,24 @@ def get_import_mappings(tree) -> dict[str, str]:

def _get_module_class_registry(
module_filepath: Path, module_name: str, class_extras: dict[str, Callable]
) -> dict[str, dict[str, Any]]:
) -> tuple[dict[str, dict[str, Any]], dict[str, set[str]]]:
"""
Extracts classes and its information from a Python module file.
Extracts classes and module-level functions from a Python module file.

The function parses the specified module file and registers all classes.
The registry for each class includes the module filename, methods, base classes
and any additional class extras provided.
The registry for each class includes the module filename, methods, base classes,
any additional class extras provided, and temporary ``_class_node`` /
``_import_mappings`` entries for deferred analysis.

It also collects fully-qualified call targets for every module-level function
so that transitive helper discovery can be done without re-reading the file.

:param module_filepath: The file path of the module.
:param module_name: Fully-qualified module name.
:param class_extras: Additional information to include in each class's registry.

:return: A dictionary with class names as keys and their corresponding information.
:return: A tuple of (class_registry, function_calls) where *function_calls*
maps each ``module.function_name`` to the set of fully-qualified calls it makes.
"""
with open(module_filepath) as file:
ast_obj = ast.parse(file.read())
Expand All @@ -174,6 +188,8 @@ def _get_module_class_registry(
for b in node.bases
if isinstance(b, ast.Name)
],
"_class_node": node,
"_import_mappings": import_mappings,
**{
key: callable_(class_node=node, import_mappings=import_mappings)
for key, callable_ in class_extras.items()
Expand All @@ -182,7 +198,46 @@ def _get_module_class_registry(
for node in ast_obj.body
if isinstance(node, ast.ClassDef)
}
return module_class_registry
module_function_calls = {
f"{module_name}.{node.name}": _find_calls_in_function(node, import_mappings)
for node in ast_obj.body
if isinstance(node, ast.FunctionDef)
}
return module_class_registry, module_function_calls


def _get_methods_with_hook_level_lineage(
class_path: str,
class_registry: dict[str, dict[str, Any]],
target_calls: set[str],
) -> set[str]:
"""
Return method names that have hook-level lineage calls on this class or any base class.

Walks the inheritance tree so that child classes are considered to have HLL when a
base class implements it (e.g. DbApiHook._run_command → PostgresHook, MySqlHook, etc.).
HLL is computed lazily on first access using the stored AST data.
"""
if class_path not in class_registry:
return set()
info = class_registry[class_path]
if "methods_with_hook_level_lineage" not in info:
class_node = info.pop("_class_node", None)
import_mappings = info.pop("_import_mappings", None)
info["methods_with_hook_level_lineage"] = (
find_class_methods_with_specific_calls(
class_node=class_node,
target_calls=target_calls,
import_mappings=import_mappings,
)
if class_node is not None
else set()
)
methods: set[str] = set(info["methods_with_hook_level_lineage"])
for base_name in info.get("base_classes") or []:
if base_name in class_registry:
methods |= _get_methods_with_hook_level_lineage(base_name, class_registry, target_calls)
return methods


def _has_method(
Expand Down Expand Up @@ -228,19 +283,81 @@ def _has_method(
return False


def _inherits_from(
class_path: str,
ancestor_path: str,
class_registry: dict[str, dict[str, Any]],
) -> bool:
"""Check whether *class_path* inherits from *ancestor_path* (walking the registry)."""
if class_path == ancestor_path:
return True
if class_path not in class_registry:
return False
return any(
_inherits_from(base, ancestor_path, class_registry)
for base in class_registry[class_path]["base_classes"]
)


def _find_calls_in_function(func_node: ast.FunctionDef, import_mappings: dict[str, str]) -> set[str]:
"""Return fully-qualified call targets found in a single function node."""
calls: set[str] = set()
for sub_node in ast.walk(func_node):
if not isinstance(sub_node, ast.Call):
continue
func = sub_node.func
# Direct call: some_function(...)
if isinstance(func, ast.Name):
fq = import_mappings.get(func.id)
if fq:
calls.add(fq)
# Chained call: some_function().method(...)
elif (
isinstance(func, ast.Attribute)
and isinstance(func.value, ast.Call)
and isinstance(func.value.func, ast.Name)
):
fq = import_mappings.get(func.value.func.id)
if fq:
calls.add(f"{fq}.{func.attr}")
return calls


def _compute_transitive_closure(function_calls: dict[str, set[str]], root_targets: set[str]) -> set[str]:
"""
Expand *root_targets* with module-level functions that transitively call them.

:param function_calls: Mapping of fully-qualified function names to the set of fully-qualified calls
each function makes (as collected during module scanning).
:param root_targets: The seed set of call targets (e.g. ``get_hook_lineage_collector().add_extra``).
:return: Expanded set that includes *root_targets* plus any discovered wrapper functions.
"""
targets = set(root_targets)
changed = True
while changed:
changed = False
for fq_name, calls in function_calls.items():
if fq_name not in targets and calls & targets:
targets.add(fq_name)
changed = True
return targets


def _get_providers_class_registry(
class_extras: dict[str, Callable] | None = None,
) -> dict[str, dict[str, Any]]:
) -> tuple[dict[str, dict[str, Any]], dict[str, set[str]]]:
"""
Builds a registry of classes from YAML configuration files.
Builds a registry of classes and module-level function call graph from YAML configuration files.

This function scans through YAML configuration files to build a registry of classes.
It parses each YAML file to get the provider's name and registers classes from Python
module files within the provider's directory, excluding '__init__.py'.

:return: A dictionary with provider names as keys and a dictionary of classes as values.
:return: A tuple of (class_registry, function_calls) where *function_calls* maps
each fully-qualified module-level function to the set of calls it makes.
"""
class_registry = {}
class_registry: dict[str, dict[str, Any]] = {}
function_calls: dict[str, set[str]] = {}
for provider_yaml_content in load_package_data():
provider_pkg_root = Path(provider_yaml_content["package-dir"])
for root, _, file_names in os.walk(provider_pkg_root):
Expand All @@ -251,7 +368,7 @@ def _get_providers_class_registry(

module_filepath = folder.joinpath(file_name)

module_registry = _get_module_class_registry(
module_registry, module_func_calls = _get_module_class_registry(
module_filepath=module_filepath,
module_name=(
provider_yaml_content["python-module"]
Expand All @@ -268,8 +385,9 @@ def _get_providers_class_registry(
},
)
class_registry.update(module_registry)
function_calls.update(module_func_calls)

return class_registry
return class_registry, function_calls


def _render_openlineage_supported_classes_content():
Expand All @@ -279,25 +397,23 @@ def _render_openlineage_supported_classes_content():
"get_openlineage_database_specific_lineage",
)
hook_lineage_collector_path = "airflow.providers.common.compat.lineage.hook.get_hook_lineage_collector"
hook_level_lineage_collector_calls = {
hook_level_lineage_root_calls = {
f"{hook_lineage_collector_path}.add_input_asset", # Airflow 3
f"{hook_lineage_collector_path}.add_output_asset", # Airflow 3
f"{hook_lineage_collector_path}.add_input_dataset", # Airflow 2
f"{hook_lineage_collector_path}.add_output_dataset", # Airflow 2
f"{hook_lineage_collector_path}.add_extra",
}

class_registry = _get_providers_class_registry(
class_extras={
"methods_with_hook_level_lineage": partial(
find_class_methods_with_specific_calls, target_calls=hook_level_lineage_collector_calls
)
}
class_registry, function_calls = _get_providers_class_registry()

# Auto-discover module-level wrapper functions (e.g. send_sql_hook_lineage) that
# transitively call the root targets, so they don't need to be listed manually.
hook_level_lineage_collector_calls = _compute_transitive_closure(
function_calls, hook_level_lineage_root_calls
)

# Excluding these classes from auto-detection, and any subclasses, to prevent detection of methods
# from abstract base classes (which need explicit OL support). Will be included in docs manually
class_registry.pop("airflow.providers.common.sql.hooks.sql.DbApiHook")
base_sql_hook_class_path = "airflow.providers.common.sql.hooks.sql.DbApiHook"
base_sql_op_class_path = "airflow.providers.common.sql.operators.sql.BaseSQLOperator"

providers: dict[str, dict[str, Any]] = {}
Expand Down Expand Up @@ -341,7 +457,8 @@ def _render_openlineage_supported_classes_content():
class_path=class_path,
method_names=openlineage_db_hook_methods,
class_registry=class_registry,
):
ignored_classes=[base_sql_hook_class_path],
) and _inherits_from(class_path, base_sql_hook_class_path, class_registry):
db_type = ( # Extract db type from hook name
class_name.replace("RedshiftSQL", "Redshift") # for RedshiftSQLHook
.replace("DatabricksSql", "Databricks") # for DatabricksSqlHook
Expand All @@ -350,11 +467,12 @@ def _render_openlineage_supported_classes_content():
)
db_hooks.append((db_type, class_path))

elif info["methods_with_hook_level_lineage"]:
hll_methods = _get_methods_with_hook_level_lineage(
class_path, class_registry, hook_level_lineage_collector_calls
)
if hll_methods:
provider_entry["hooks"][class_path] = [
f"{class_path}.{method}"
for method in info["methods_with_hook_level_lineage"]
if not method.startswith("_")
f"{class_path}.{method}" for method in hll_methods if not method.startswith("_")
]

providers = {
Expand Down
69 changes: 58 additions & 11 deletions devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,62 @@
specific language governing permissions and limitations
under the License.
#}
Core operators
==============
At the moment, two core operators support OpenLineage. These operators function as a 'black box,'
capable of running any code, which might limit the extent of lineage extraction (e.g. lineage will usually not contain
input/output datasets). To enhance the extraction of lineage information, operators can utilize the hooks listed
below that support OpenLineage.

- :class:`~airflow.providers.standard.operators.python.PythonOperator` (via :class:`airflow.providers.openlineage.extractors.python.PythonExtractor`)
- :class:`~airflow.providers.standard.operators.bash.BashOperator` (via :class:`airflow.providers.openlineage.extractors.bash.BashExtractor`)
Supported classes
*****************

Below is a list of Operators and Hooks that support OpenLineage extraction, along with specific DB types that are compatible with the supported SQL operators.

.. important::

While we strive to keep the list of supported classes current,
please be aware that our updating process is automated and may not always capture everything accurately.
Detecting hook level lineage is challenging so make sure to double check the information provided below.

What does "supported operator" mean?
====================================

**All Airflow operators will automatically emit OpenLineage events**, (unless explicitly disabled or skipped during
scheduling, like EmptyOperator) regardless of whether they appear on the "supported" list.
Every OpenLineage event will contain basic information such as:

- Task and DAG run metadata (execution time, state, tags, parameters, owners, description, etc.)
- Job relationship (DAG job that the task belongs to, upstream/downstream relationship between tasks in a DAG etc.)
- Error message (in case of task failure)
- Airflow and OpenLineage provider versions

**"Supported" operators provide additional metadata** that enhances the lineage information:

- **Input and output datasets** (sometimes with Column Level Lineage)
- **Operator-specific details** that may include SQL query text and query IDs, source code, job IDs from external systems (e.g., Snowflake or BigQuery job ID), data quality metrics and other information.

For example, a supported SQL operator will include the executed SQL query, query ID, and input/output table information
in its OpenLineage events. An unsupported operator will still appear in the lineage graph, but without these details.

.. tip::

You can easily implement OpenLineage support for any operator. See :ref:`guides/developer:openlineage`.


.. _hook-lineage:

Hook Level Lineage
==================
Some operators (like :class:`~airflow.providers.standard.operators.python.PythonOperator`) function as a "black box"
capable of running arbitrary code, which usually prevents the extraction of input/output datasets. To address this,
Airflow tracks hook-level lineage: when a supported hook method is invoked (even from within a Python callable)
the OpenLineage integration can automatically capture lineage from that execution. For example, reading a file
through a storage hook can report the file as an input dataset, while writing to an object store can report an
output dataset.

For hooks that execute SQL (mostly subclasses of :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook`),
the integration can go further. Besides recording which assets were read or written (by using SQL parsing),
it may also extract the executed SQL text, external query/job IDs. For each query a separate pair of child OpenLineage
events is emitted.

.. important::
The level of detail captured varies between hooks and methods. Some may only report dataset information, while others
expose SQL text, query IDs and more. Review the hook implementation to confirm what lineage data is available.

Spark operators
===============
Expand Down Expand Up @@ -61,7 +108,7 @@ The operators and hooks listed below from each provider are natively equipped wi

{%for provider_name, provider_dict in providers.items() %}
{{ provider_name }} ({{ provider_dict['version'] }})
{{ '"' * 2 * (provider_name|length) }}
{{ '-' * 2 * (provider_name|length) }}

{% if provider_dict['operators'] %}
Operators
Expand All @@ -80,8 +127,8 @@ Operators
{% endif %}

{% if provider_dict['hooks'] %}
Hooks
^^^^^
:ref:`Hooks* <hook-lineage>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
{% for hook, methods in provider_dict['hooks'].items() %}
- :class:`~{{ hook }}`
{% for method in methods %}
Expand Down
Loading
Loading