-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-209] Add scheduler tests and improve lineage handling #1568
Conversation
ti.state = State.REMOVED | ||
try: | ||
dag.get_task(ti.task_id) | ||
except AirflowException: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder about this logic. Is it possible to get an AirflowException for another reason, and erroneously set the state to REMOVED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. Removed is set when the task
Does not exist in the DAG. This exception is thrown when the task does not exist in task dictionary of the DAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is the guard (if self.state is not State.RUNING
) against it running to protect against this situation:
- Task starts running
- While task is running, DAG is modified and task is removed
- We don't want to interrupt the running task, that's what this guard is for
If so do you mind just commenting the logic to clarify?
LGTM |
This patch adds schedule_dag and process_dag unittests. It also fixes some minor bugs that were caught by these tests. Some small changes for readability.
ff3ce4f
to
fb5a3b3
Compare
@@ -491,3 +491,196 @@ def test_scheduler_process_check_heartrate(self): | |||
scheduler.process_dag(dag, queue=queue) | |||
|
|||
queue.put.assert_not_called() | |||
|
|||
def test_scheduler_do_not_schedule_removed_task(self): | |||
dag = DAG( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DAG creation code is very repetitive and I think it will be useful for future tests. Maybe you could refactor it into a helper function?
+1 |
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues:
Reminders for contributors: