-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DebugExecutor use ti.run() instead of ti._run_raw_task #24357
DebugExecutor use ti.run() instead of ti._run_raw_task #24357
Conversation
@@ -1599,7 +1615,7 @@ def test_mapped_dag(self, dag_id, executor_name, session): | |||
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py')) | |||
dag = self.dagbag.get_dag(dag_id) | |||
|
|||
when = datetime.datetime(2022, 1, 1) | |||
when = timezone.datetime(2022, 1, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: DB errors are thrown if a non-timezoned date is used, which makes me think these long running tests aren't being run in our CI? Since they would be failing. @potiuk do you have any context around that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Long running tests are NOT run on our CI at all :).
There is no more context than that :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆 I meant more context for why that's the case. I assume it's because folks just don't want to slow down the regular builds, but maybe there were timeouts on the test runners? Either way, what do you think about a scheduled CI run (once daily maybe?) on main
which includes long running tests so that we at least get some coverage for them, thoughts?
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
The DebugExecutor previously executed tasks by calling the "private" ti._run_raw_task(...) method instead of ti.run(...). But the latter contains the logic to increase task instance try_numbers when running, thus tasks executed with the DebugExecutor were never getting their try_numbers increased and for rescheduled tasks this led to off-by-one errors (as the logic to reduce the try_number for the reschedule was still working while the increase was not).
1acafa6
to
86b94e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me. IIRC sensors were previously executed properly but in blocking manner:
airflow/airflow/models/taskinstance.py
Lines 905 to 908 in fe2334f
if isinstance(task_copy, BaseSensorOperator) and \ | |
conf.get('core', 'executor') == "DebugExecutor": | |
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.") | |
task_copy.mode = 'reschedule' |
However, this code is no longer present.
Merging. The failing tests are because of "full tests needed" and timeouts on K8S tests (should be fixed by #24408) |
The DebugExecutor previously executed tasks by calling the "private" ti._run_raw_task(...) method instead of ti.run(...). But the latter contains the logic to increase task instance try_numbers when running, thus tasks executed with the DebugExecutor were never getting their try_numbers increased and for rescheduled tasks this led to off-by-one errors (as the logic to reduce the try_number for the reschedule was still working while the increase was not). (cherry picked from commit da7b22b)
The DebugExecutor previously executed tasks by calling the "private" ti._run_raw_task(...) method instead of ti.run(...). But the latter contains the logic to increase task instance try_numbers when running, thus tasks executed with the DebugExecutor were never getting their try_numbers increased and for rescheduled tasks this led to off-by-one errors (as the logic to reduce the try_number for the reschedule was still working while the increase was not). (cherry picked from commit da7b22b)
The DebugExecutor previously executed tasks by calling the "private" ti._run_raw_task(...) method instead of ti.run(...). But the latter contains the logic to increase task instance try_numbers when running, thus tasks executed with the DebugExecutor were never getting their try_numbers increased. For rescheduled tasks this led to off-by-one errors (as the logic to reduce the try_number for the reschedule was still working while the increase was not).
This off-by-one error manifests as a KeyError in _update_counters (seen in #13322) since the try_number for ti.keys in the in-memory ti_status.running map don't match the try_number in the ti.keys in the DB. #13322 was marked as resolved because there were two issues being conflated, one issues was fixed (and the ticket closed) but users were still seeing this failure due to the issue fixed in this PR.
This unblocks system tests (which are executed with the DebugExecutor) which will hit the aforementioned off-by-one error if a system test includes a Sensor (which does more than one poke, it needs to be rescheduled at least once to raise the exception).
NOTE: The main fix here is to use ti.run() for the debug executor, if anyone has the historical context for that design decision please weigh in, thanks! (CC @turbaszek)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named
{pr_number}.significant.rst
, in newsfragments.