fix: Adjust OpenLineage DefaultExtractor for RuntimeTaskInstance in Airflow 3 #47673
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
In OpenLineage, we use the
DefaultExtractorto extract lineage metadata from Operators that implement OL methods. To determine whether a task succeeded or failed, we rely onti.statehere.I think we should get rid of that dependency on airflow code model. (EDIT: Missing
stateattr inRunTimeTaskInstancein Airflow 3 caused our extractor to fail, but even ifstateis reinstated toRunTimeTaskInstance, we should still proceed with this PR).Now, depending on which listener method is called (
on_task_instance_running,on_task_instance_success,on_task_instance_failed), we manually propagate the currentTaskInstanceStateto the OL'sExtractorManager. This change allows us to invoke appropriate Extractor method (extract_on_completeor newly addedextract_on_failure). This is a similar approach to what we implemented with custom_run_facetsUntil now, users using custom extractors had to manually determine whether the task has failed or completed successfully, using
task_instanceobject. They will still be able to do that as long as thestateattr is available. The other way, implemented in this PR, would be to implement this new extractor method (extract_on_failure) so that they don't have to worry about checking thestatethemselves.I think adding a new method to the extractor base class is the simplest approach, ensuring we don’t break anything for users with existing custom extractors. IMHO this keeps the transition smooth while maintaining backward compatibility.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.