-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Function to expand mapped tasks in to multiple "real" TIs #21019
Conversation
I'm not sold that I've put that method on the right class, so opinions welcome. |
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte | |||
assert len(tis) == expected_tis | |||
|
|||
Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis) | |||
|
|||
|
|||
@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is actually a good idea -- although we could put it here, that puts more work in the core scheduler loop so I think we could reasonably delay this to the mini scheduler in upstream task.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can only have it in the mini scheduler, as that can be turned off. Maybe toggle where it is run based on the mini scheduler being on or off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't only be in the mini scheduler run, there will still be a "expansion of last resort" in the scheduler. I guess the difference is do we want to do the expansion eagerly at DagRun creation time, when it could possibly be done in another process (the LocalTaskJob).
It's probably going to be quite rare in practice that maps will be literals, so I think it's not even the cost to check this here, given that it's so unlikely it will do anything useful.
0e5375b
to
bc5adae
Compare
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr, session, run_type, expecte | |||
assert len(tis) == expected_tis | |||
|
|||
Stats_incr.assert_called_with('task_instance_created-DummyOperator', expected_tis) | |||
|
|||
|
|||
@pytest.mark.xfail(reason="TODO: Expand mapped literals at verify_integrity time!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can only have it in the mini scheduler, as that can be turned off. Maybe toggle where it is run based on the mini scheduler being on or off?
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Builds upon #20945
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.