openlineage: improve spawned OpenLineage process in scheduler #39735
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Although there's no call made by OpenLineage Provider to Airflow database from the process(es) spawned with ProcessPoolExecutor in Airflow, in #39520 there would be added such. Not only because of that reason there should be ORM re-configured on initialization of the processes spawned.
Additionally, ProcessPoolExecutor catches all exceptions raised in its' workers and sets information about them in the result object. However, we're not waiting or checking the status of submitted jobs, therefore exceptions are swallowed and logged anywhere.
I tried adding some tests for checking if logs land properly now from within OpenLineageListener's ProcessPoolExecutor jobs. Pytest seems not to like
multiprocessing
and I ended up giving up on the tests.Both changes, however, were tested in breeze and Astro Cloud with running 1000 concurrent DagRuns and gave expected results.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.