Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions devel-common/src/sphinx_exts/providers_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,20 @@ def _get_module_class_registry(


def _has_method(
class_path: str, method_names: Iterable[str], class_registry: dict[str, dict[str, Any]]
class_path: str,
method_names: Iterable[str],
class_registry: dict[str, dict[str, Any]],
ignored_classes: list[str] | None = None,
) -> bool:
"""
Determines if a class or its bases in the registry have any of the specified methods.
:param class_path: The path of the class to check.
:param method_names: A list of names of methods to search for.
:param class_registry: A dictionary representing the class registry, where each key is a class name
and the value is its metadata.
and the value is its metadata.
:param ignored_classes: A list of classes to ignore when searching. If a base class has
OL method but is ignored, the class_path will be treated as it would not have ol methods.
:return: True if any of the specified methods are found in the class or its base classes; False otherwise.
Example:
Expand All @@ -209,11 +214,16 @@ def _has_method(
>>> _has_method("some.module.MyClass", ["not_a_method"], example_class_registry)
False
"""
ignored_classes = ignored_classes or []
if class_path in ignored_classes:
return False
if class_path in class_registry:
if any(method in class_registry[class_path]["methods"] for method in method_names):
return True
for base_name in class_registry[class_path]["base_classes"]:
if _has_method(base_name, method_names, class_registry):
if base_name in ignored_classes:
continue
if _has_method(base_name, method_names, class_registry, ignored_classes):
return True
return False

Expand Down Expand Up @@ -274,6 +284,7 @@ def _render_openlineage_supported_classes_content():
f"{hook_lineage_collector_path}.add_output_asset", # Airflow 3
f"{hook_lineage_collector_path}.add_input_dataset", # Airflow 2
f"{hook_lineage_collector_path}.add_output_dataset", # Airflow 2
f"{hook_lineage_collector_path}.add_extra",
}

class_registry = _get_providers_class_registry(
Expand All @@ -287,28 +298,44 @@ def _render_openlineage_supported_classes_content():
# Excluding these classes from auto-detection, and any subclasses, to prevent detection of methods
# from abstract base classes (which need explicit OL support). Will be included in docs manually
class_registry.pop("airflow.providers.common.sql.hooks.sql.DbApiHook")
class_registry.pop("airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator")
base_sql_op_class_path = "airflow.providers.common.sql.operators.sql.BaseSQLOperator"

providers: dict[str, dict[str, Any]] = {}
db_hooks: list[tuple[str, str]] = []
db_operators: list[str] = []
for class_path, info in class_registry.items():
class_name = class_path.split(".")[-1]
if class_name.startswith("_"):
continue
provider_entry = providers.setdefault(
info["provider_name"], {"operators": {}, "hooks": {}, "version": info["provider_version"]}
info["provider_name"],
{"operators": {}, "db_operators": {}, "hooks": {}, "version": info["provider_version"]},
)

if class_name.lower().endswith("operator"):
if _has_method(
if _has_method( # Operators that have OL methods NOT from BaseSQlOperator inheritance
class_path=class_path,
method_names=openlineage_operator_methods,
class_registry=class_registry,
ignored_classes=[base_sql_op_class_path], # Exclude child classes of BaseSQlOperator
):
provider_entry["operators"][class_path] = [
f"{class_path}.{method}"
for method in set(openlineage_operator_methods) & set(info["methods"])
]
elif class_path == base_sql_op_class_path:
continue # Explicitly skip BaseSQlOperator - it's documented manually.
elif _has_method( # Operators that have OL methods from BaseSQlOperator inheritance
class_path=class_path,
method_names=openlineage_operator_methods,
class_registry=class_registry,
ignored_classes=[], # Do not exclude child classes of BaseSQlOperator
):
provider_entry["db_operators"][class_path] = [
f"{class_path}.{method}"
for method in set(openlineage_operator_methods) & set(info["methods"])
]
db_operators.append(class_path)
elif class_name.lower().endswith("hook"):
if _has_method(
class_path=class_path,
Expand Down Expand Up @@ -338,15 +365,21 @@ def _render_openlineage_supported_classes_content():
details["operators"].items(), key=lambda x: x[0].split(".")[-1]
)
},
"db_operators": {
operator: sorted(methods)
for operator, methods in sorted(
details["db_operators"].items(), key=lambda x: x[0].split(".")[-1]
)
},
"hooks": {
hook: sorted(methods)
for hook, methods in sorted(details["hooks"].items(), key=lambda x: x[0].split(".")[-1])
},
"version": details["version"],
}
for provider, details in sorted(providers.items())
# Below filters out providers with empty 'operators' and 'hooks'
if details["hooks"] or details["operators"]
# Below filters out providers with empty 'operators', 'db_operators' and 'hooks'
if details["hooks"] or details["operators"] or details["db_operators"]
}
db_hooks = sorted({hook: db_type for db_type, hook in db_hooks}.items(), key=lambda x: x[1])

Expand Down
41 changes: 26 additions & 15 deletions devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
Core operators
==============
At the moment, two core operators support OpenLineage. These operators function as a 'black box,'
capable of running any code, which might limit the extent of lineage extraction. To enhance the extraction
of lineage information, operators can utilize the hooks listed below that support OpenLineage.
capable of running any code, which might limit the extent of lineage extraction (e.g. lineage will usually not contain
input/output datasets). To enhance the extraction of lineage information, operators can utilize the hooks listed
below that support OpenLineage.

- :class:`~airflow.providers.standard.operators.python.PythonOperator` (via :class:`airflow.providers.openlineage.extractors.python.PythonExtractor`)
- :class:`~airflow.providers.standard.operators.bash.BashOperator` (via :class:`airflow.providers.openlineage.extractors.bash.BashExtractor`)
Expand Down Expand Up @@ -62,23 +63,25 @@ apache-airflow-providers-google
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator` and derivatives
=============================================================================================
.. _sql-operators:

These operators are using SQL parsing and may query DB for lineage extraction.
SQL operators
=============

Operators inheriting from :class:`~airflow.providers.common.sql.operators.sql.BaseSQLOperator` may be supported
out of the box. These operators can use SQL parsing and may query DB for lineage extraction.
To extract unique data from each database type, a dedicated Hook implementing OpenLineage methods is required.
Not all subclasses of :class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
are automatically supported, only those also using a supported Hook.
Not all subclasses of :class:`~airflow.providers.common.sql.operators.sql.BaseSQLOperator` are automatically supported,
only those also using a supported Hook and similar attribute naming convention (e.g., storing query under `self.sql`).

.. note::
Due to the automatic generation of this documentation, some supported operators inheriting from
:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator` may not appear in the ``Providers``
section below - to prevent false positives. Examples of such Operators are:
:class:`~airflow.providers.databricks.operators.databricks_sql.DatabricksSqlOperator` and
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeSqlApiOperator`.
Please check your operator's code to confirm OpenLineage support.
.. important::
The level of OpenLineage extraction may vary between SQL operators. Most will provide the executed SQL text,
while others may also expose additional metadata such as query IDs or other query-related information. Due to
the automatic generation of this documentation, some operators listed as supported SQL operators may not contain
full lineage information. Please review the implementation of your operator and its corresponding hook to confirm
the level of OpenLineage support.

Currently, the following databases (hooks) are supported:
Currently, the following databases (hooks) are supported together with SQL operators:

{% for hook, db_type in db_hooks %}
- {{ db_type }} (via :class:`~{{ hook }}`)
Expand All @@ -101,6 +104,14 @@ Operators
{% endfor %}
{% endif %}

{% if provider_dict['db_operators'] %}
:ref:`SQL operators* <sql-operators>`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
{% for db_operator, methods in provider_dict['db_operators'].items() %}
- :class:`~{{ db_operator }}`
{% endfor %}
{% endif %}

{% if provider_dict['hooks'] %}
Hooks
^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,23 @@ class TaskInfo(InfoJsonEncodable):
"external_dates_filter", # ExternalTaskSensor
"logical_date", # AF 3 ExternalTaskMarker (if run, as it's EmptyOperator)
"execution_date", # AF 2 ExternalTaskMarker (if run, as it's EmptyOperator)
"database", # BaseSQlOperator
"parameters", # SQLCheckOperator, SQLValueCheckOperator and BranchSQLOperator
"column_mapping", # SQLColumnCheckOperator
"pass_value", # SQLValueCheckOperator
"tol", # SQLValueCheckOperator
"metrics_thresholds", # SQLIntervalCheckOperator
"ratio_formula", # SQLIntervalCheckOperator
"ignore_zero", # SQLIntervalCheckOperator
"min_threshold", # SQLThresholdCheckOperator
"max_threshold", # SQLThresholdCheckOperator
"follow_task_ids_if_true", # BranchSQLOperator
"follow_task_ids_if_false", # BranchSQLOperator
"follow_branch", # BranchSQLOperator
"preoperator", # SQLInsertRowsOperator
"postoperator", # SQLInsertRowsOperator
"table_name_with_schema", # SQLInsertRowsOperator
"column_names", # SQLInsertRowsOperator
]
casts = {
"operator_class": lambda task: task.task_type,
Expand Down
Loading
Loading