diff --git a/providers/openlineage/docs/guides/developer.rst b/providers/openlineage/docs/guides/developer.rst index 0485e43048288..65b394236b97b 100644 --- a/providers/openlineage/docs/guides/developer.rst +++ b/providers/openlineage/docs/guides/developer.rst @@ -188,12 +188,17 @@ Interface Custom Extractors have to derive from :class:`BaseExtractor ` and implement at least two methods: ``_execute_extraction`` and ``get_operator_classnames``. -BaseExtractor defines two methods: ``extract`` and ``extract_on_complete``, that are called and used to provide actual lineage data. -The difference is that ``extract`` is called before Operator's ``execute`` method, while ``extract_on_complete`` is called after. -By default, ``extract`` calls ``_execute_extraction`` method implemented in custom Extractor, and ``extract_on_complete`` -calls the ``extract`` method. If you want to provide some additional information available after the task execution, you can -override ``extract_on_complete`` method. This can be used to extract any additional information that the Operator -sets on it's own properties. Good example is ``SnowflakeOperator`` that sets ``query_ids`` after execution. +BaseExtractor defines three more methods: ``extract``, ``extract_on_complete`` and ``extract_on_failure``, +that are called and used to provide actual lineage data. +The difference is that ``extract`` is called before Operator's ``execute`` method, while ``extract_on_complete`` and +``extract_on_failure`` are called after - when the task either succeeds or fails, respectively. +By default, ``extract`` calls ``_execute_extraction`` method implemented in custom Extractor. +When the task succeeds, ``extract_on_complete`` is called and if not overwritten, by default, it delegates to ``extract``. +When the task fails, ``extract_on_failure`` is called and if not overwritten, by default, it delegates to ``extract_on_complete``. +If you want to provide some additional information available after the task execution, you can +override ``extract_on_complete`` and ``extract_on_failure`` methods. +This is useful for extracting data the Operator sets as it's own properties during or after execution. +Good example is an SQL operator that sets ``query_ids`` after execution. The ``get_operator_classnames`` is a classmethod that is used to provide list of Operators that your Extractor can get lineage from. @@ -203,10 +208,10 @@ For example: @classmethod def get_operator_classnames(cls) -> List[str]: - return ['PostgresOperator'] + return ['CustomPostgresOperator'] If the name of the Operator matches one of the names on the list, the Extractor will be instantiated - with Operator -provided in the Extractor's ``self.operator`` property - and both ``extract`` and ``extract_on_complete`` methods will be called. +provided in the Extractor's ``self.operator`` property - and both ``extract`` and ``extract_on_complete``/``extract_on_failure`` methods will be called. Both methods return ``OperatorLineage`` structure: @@ -270,7 +275,7 @@ If the path is wrong or non-importable from worker, plugin will fail to load the Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts, any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail. -To avoid this issue, import from Airflow only locally - in ``_execute_extraction`` or ``extract_on_complete`` methods. +To avoid this issue, import from Airflow only locally - in ``_execute_extraction`` or ``extract_on_complete``/``extract_on_failure`` methods. If you need imports for type checking, guard them behind typing.TYPE_CHECKING. @@ -300,7 +305,8 @@ Example This is an example of a simple Extractor for an Operator that executes export Query in BigQuery and saves the result to S3 file. Some information is known before Operator's ``execute`` method is called, and we can already extract some lineage in ``_execute_extraction`` method. After Operator's ``execute`` method is called, in ``extract_on_complete``, we can simply attach some additional Facets -f.e. with Bigquery Job ID to what we've prepared earlier. This way, we get all possible information from the Operator. +f.e. with Bigquery Job ID to what we've prepared earlier. We can also implement ``extract_on_failure`` method, if there is +a need to include some information only when task fails. This way, we get all possible information from the Operator. Please note that this is just an example. There are some OpenLineage built-in features that can facilitate different processes, like extracting column level lineage and inputs/outputs from SQL query with SQL parser. @@ -312,6 +318,7 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL from airflow.providers.common.compat.openlineage.facet import ( Dataset, ExternalQueryRunFacet, + ErrorMessageRunFacet, SQLJobFacet, ) @@ -324,7 +331,7 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL self._job_id = None def execute(self, context) -> Any: - self._job_id = run_query(query=self.query) + self._job_id, self._error_message = run_query(query=self.query) class ExampleExtractor(BaseExtractor): @@ -352,6 +359,16 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL } return lineage_metadata + def extract_on_failure(self, task_instance) -> OperatorLineage: + """Add any failure-specific information.""" + lineage_metadata = self.extract_on_complete(task_instance) + lineage_metadata.run_facets = { + "error": ErrorMessageRunFacet( + message=task_instance.task._error_message, programmingLanguage="python" + ) + } + return lineage_metadata + For more examples of OpenLineage Extractors, check out the source code of `BashExtractor `_ or `PythonExtractor `_.