Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions providers/openlineage/docs/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,17 @@ Interface
Custom Extractors have to derive from :class:`BaseExtractor <airflow.providers.openlineage.extractors.base.BaseExtractor>`
and implement at least two methods: ``_execute_extraction`` and ``get_operator_classnames``.

BaseExtractor defines two methods: ``extract`` and ``extract_on_complete``, that are called and used to provide actual lineage data.
The difference is that ``extract`` is called before Operator's ``execute`` method, while ``extract_on_complete`` is called after.
By default, ``extract`` calls ``_execute_extraction`` method implemented in custom Extractor, and ``extract_on_complete``
calls the ``extract`` method. If you want to provide some additional information available after the task execution, you can
override ``extract_on_complete`` method. This can be used to extract any additional information that the Operator
sets on it's own properties. Good example is ``SnowflakeOperator`` that sets ``query_ids`` after execution.
BaseExtractor defines three more methods: ``extract``, ``extract_on_complete`` and ``extract_on_failure``,
that are called and used to provide actual lineage data.
The difference is that ``extract`` is called before Operator's ``execute`` method, while ``extract_on_complete`` and
``extract_on_failure`` are called after - when the task either succeeds or fails, respectively.
By default, ``extract`` calls ``_execute_extraction`` method implemented in custom Extractor.
When the task succeeds, ``extract_on_complete`` is called and if not overwritten, by default, it delegates to ``extract``.
When the task fails, ``extract_on_failure`` is called and if not overwritten, by default, it delegates to ``extract_on_complete``.
If you want to provide some additional information available after the task execution, you can
override ``extract_on_complete`` and ``extract_on_failure`` methods.
This is useful for extracting data the Operator sets as it's own properties during or after execution.
Good example is an SQL operator that sets ``query_ids`` after execution.

The ``get_operator_classnames`` is a classmethod that is used to provide list of Operators that your Extractor can get lineage from.

Expand All @@ -203,10 +208,10 @@ For example:

@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
return ['CustomPostgresOperator']

If the name of the Operator matches one of the names on the list, the Extractor will be instantiated - with Operator
provided in the Extractor's ``self.operator`` property - and both ``extract`` and ``extract_on_complete`` methods will be called.
provided in the Extractor's ``self.operator`` property - and both ``extract`` and ``extract_on_complete``/``extract_on_failure`` methods will be called.

Both methods return ``OperatorLineage`` structure:

Expand Down Expand Up @@ -270,7 +275,7 @@ If the path is wrong or non-importable from worker, plugin will fail to load the
Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts,
any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail.

To avoid this issue, import from Airflow only locally - in ``_execute_extraction`` or ``extract_on_complete`` methods.
To avoid this issue, import from Airflow only locally - in ``_execute_extraction`` or ``extract_on_complete``/``extract_on_failure`` methods.
If you need imports for type checking, guard them behind typing.TYPE_CHECKING.


Expand Down Expand Up @@ -300,7 +305,8 @@ Example
This is an example of a simple Extractor for an Operator that executes export Query in BigQuery and saves the result to S3 file.
Some information is known before Operator's ``execute`` method is called, and we can already extract some lineage in ``_execute_extraction`` method.
After Operator's ``execute`` method is called, in ``extract_on_complete``, we can simply attach some additional Facets
f.e. with Bigquery Job ID to what we've prepared earlier. This way, we get all possible information from the Operator.
f.e. with Bigquery Job ID to what we've prepared earlier. We can also implement ``extract_on_failure`` method, if there is
a need to include some information only when task fails. This way, we get all possible information from the Operator.

Please note that this is just an example. There are some OpenLineage built-in features that can facilitate different processes,
like extracting column level lineage and inputs/outputs from SQL query with SQL parser.
Expand All @@ -312,6 +318,7 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
ErrorMessageRunFacet,
SQLJobFacet,
)

Expand All @@ -324,7 +331,7 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL
self._job_id = None

def execute(self, context) -> Any:
self._job_id = run_query(query=self.query)
self._job_id, self._error_message = run_query(query=self.query)


class ExampleExtractor(BaseExtractor):
Expand Down Expand Up @@ -352,6 +359,16 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL
}
return lineage_metadata

def extract_on_failure(self, task_instance) -> OperatorLineage:
"""Add any failure-specific information."""
lineage_metadata = self.extract_on_complete(task_instance)
lineage_metadata.run_facets = {
"error": ErrorMessageRunFacet(
message=task_instance.task._error_message, programmingLanguage="python"
)
}
return lineage_metadata

For more examples of OpenLineage Extractors, check out the source code of
`BashExtractor <https://github.com/apache/airflow/blob/main/providers/amazon/aws/src/airflow/providers/openlineage/extractors/bash.py>`_ or
`PythonExtractor <https://github.com/apache/airflow/blob/main/providers/amazon/aws/src/airflow/providers/openlineage/extractors/python.py>`_.
Expand Down