-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIRFLOW-128 Optimize and refactor process_dag #1514
Conversation
dab8009
to
320285b
Compare
@@ -1061,6 +1060,7 @@ def are_dependencies_met( | |||
|
|||
task = self.task | |||
|
|||
logging.info("Checkpoint A") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this is for debugging, and will go away before final merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, everything after my initial commit in this pr is actually profiling stuff (ie. WIP). A lot of time is spent in are_dependencies_met as it is iterating over all tasks now every time now.
|
|
|
|
|
|
|
|
|
|
|
|
825866c
to
ff7ebba
Compare
|
|
|
|
|
|
Got it. Thanks! :) |
or_(DagRun.external_trigger == False, | ||
# add % as a wildcard for the like query | ||
DagRun.run_id.like(DagRun.ID_PREFIX+'%'))) | ||
dag_id=dag.dag_id).filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my new preferred way to indent method chains is
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id = dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
DagRun.run_id.like(DagRun.ID_PREFIX+'%')
)
)
@plypaul, how does this play with your upcoming PR? can you submit yours or share the branch you've been working with to get a sense on whether there's duplicated effort/overlap here? |
This patch addresses the following issues: get_active_runs was a getter that was also updating to the database. This patch refactors get_active_runs into two different functions that are part of DagRun. update_state update state of the dagrun based on the taskinstances of the dagrun. verify_integrity checks and updates the dag run based on if the dag contains new or missing tasks. Deadlock detection has been refactored to ensure that database does not get hit twice, in some circumstances this can reduce the time spent by 50%. process_dag has been refactored to use the functions of DagRun reducing complexity and reducing pressure on the database. In addition locking is now properly working under the assumption that the heartrate is longer than the time process_dag spends. Two new TaskInstance states have been introduced. "REMOVED" and "SCHEDULED". REMOVED will be set when taskinstances are encountered that do no exist anymore in the DAG. This happens when a DAG is changed (ie. a new version). The "REMOVED" state exists for lineage purposes. "SCHEDULED" is used when a Task that did not have a state before is sent to the executor. It is used by both the scheduler and backfills. This state almost removes the race condition that exists if using multiple schedulers: due to the fact UP_FOR_RETRY is being managed by the TaskInstance (I think that is the wrong place) is still exists for that state.
Yeah, as @bolkedebruin mentioned, this should be mostly compatible with #1559. To clarify, what's the difference between the |
@plypaul SCHEDULED is set at handover from the scheduler to the executor only. It can only be set by the scheduler. It prevents race conditions. Ideally, it should always be the state of the task when it is sent to the executor. It isn't now due to UP_FOR_RETRY being handled by the TI instead of in the scheduler. QUEUED indicates that a ti is waiting for a slot in the executor from either a pool or max parallelism. It is a bit ambiguous as it is managed in different locations. Some of the questions @jlowin and I have around #1559 are due to this ambiguity and your broader use of QUEUED. |
@bolkedebruin so is the order SCHEDULED -> QUEUED? |
Can you elaborate on the race condition that the |
@plypaul via https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics
Basically, without a SCHEDULED state, the task is given to the executor. If the scheduler loops around fast enough again (or you have multiple schedulers running) before the executor updates the task's state, the scheduler will schedule it again, since the task's state hasn't changed (and it's still runnable). |
I share the same concern as @jlowin. Schedulers can be restarted at random points for deployment / instance replacements, and having orphaned tasks is something that needs to be addressed. |
Prior to adding the |
@plypaul arbitrary means a SIGKILL in this case which leaves any process in an undetermined state. So the chances of it to occur are small (it also needs to coincide with the time between the change to scheduled and the executor receiving it at a kill of the scheduler). There is also a small chance the executor gets killed which would leave the task in limbo too. To remove no 1. The with_for_update needs to wrap the "send" to the executor. The transaction to the db will then fail in case of a kill and the state will not be changed. To remove number 2 probably a kind of garbage collection needs to be added (at the time of checking for zombie tasks?). Changes are slim and in general one should not go around kill -9 processes. |
On the occurrence of the race condition please see the comment in the executor by @jlowin. As said when people get more complex environments and start running two local executors for example, the chances for this to occur would be there for every scheduler loop so for it to occur at a certain point in time would be 99% certain. People might just not have seen it. |
The condition where task instances can be orphaned doesn't require This could be a problem in our setup where there can be 1000's of task instances waiting for a slot in the executor to run. That queue can take hours to clear, and if the scheduler is restarted at any point in that window, we'll have a bunch of orphaned tasks. We also use the Overall, it would be great for Airflow to be resilient to restarts and failures. Machines die and services restart all the time, and the oncall who wakes up needs to have simple remediation procedures for handling the failure. Without a way to recover from these cases, it would make the oncall's life much more difficult. |
@plypaul in case of celery that is a really short time frame: it gets to scheduled at the moment of sending it to the executor (thus celery). So if you get in between that moment yes you can get a orphaned task at the moment although chances imho are very slim. This can be further eliminated by surrounding the "send to executor" with the "for_update_block" this way the record wont get updated if sending to the executor fails. In addition the executor should set a state when it picks up the task. This way you can do a bit of garbage collection of tasks that are in a certain state without a heartbeat for some time. |
So this was my understanding of how the state changes with this PR:
When there are many DAGs, step 1 can take a long time. In our case, it's about 6 minutes. If the scheduler exits between steps 1 and 2, there will be orphaned tasks and the 6 minute window is fairly large. Also, the state of the task does not change when it's sent to the executor. The state of the task only changes when the task is actually run. With the |
Ah so yes like I mentioned we probably should move the set to scheduled state to to moment it is really sent to the executor and indeed not at evaluation time. And yes at the moment the executor does not set a state but it should. I'll have a look at #1 as that is the biggest issue and think about a garbage collector. |
Btw the issue with the scheduler can/should only occur on a SIGKILL a sigterm allows for clean termination (which might need some work) |
Considering the operational issues with the scheduled state, it seems like the garbage collector is needed. Why don't we go back to the previous state logic, and put then in the state change + garbage collector together? |
Ha that sounds like it is not needed (reverting - working together is fine :) ). Applying the previous state logic would just mean also evaluating the scheduled status and and re-scheduling those taskinstances everytime and let the taskinstance at run time figure out what to do. This is what the previous scheduler did with"none" states. That's kind of a one-liner. What I would suggest is to move the "set to scheduled state" to when it the taskinstance is sent to the executor. This resolves the orphaning when the scheduler is killed and is a small change. For the garbage collector we can add a time stamp last_updated to the task_instance which will get set everytime the record is updated (or a state change happens). The scheduler can then do a simple sweep by sql statement to set the state to "none" or to a new "reschedule" state by comparing it to an arbitrary timeout value. This would be orphaning down the line and would be future proof. Also a one liner. Or even better we could ask the executor which tasks it knows about and compare that to the list of scheduled tasks. If they are not there reschedule them. This is a little bit more work and might need to be combined with the above one as in the past the reporting by the executor was not really trustworthy. What do you think? Over the weekend I can have a patch for both I think. |
Changing the state when the task instance when the executor gets it reduces the window, but there are still cases where we can have a bunch of orphaned tasks. Hence, we want to try for a more robust solution. As you point out, having the scheduler check the executor to see if a particular task instance has been submitted already and only queue if it hasn't been queued seems like a simple solution to the original problem. With that solution, there wouldn't need to be an additional timeout, right? |
You are right on the possibility of orphaned tasks, but please approach them as two separate windows. Moving the "setting schedule state" to when it gets sent to the executor closes one as the DB safe guards against a kill of the scheduler by not committing the transaction. The second one is the time between the handover from the executor to the worker by means of the MQ (and then after not setting a different state). Indeed asking the executor for the information should work. The timeout would just safe guard defensively against the executor not reporting back correctly. It 'lost' tasks in the past therefore @jlowin reverted some logic in process_queued_tasks before the release of 1.7.1 . So yes I think we are on the right track :). Let's see if asking the executor solves the issue, but I do think scanning for orphaned tasks might be nice to have in also to also safe guard against any future changes. Ie. the scan has a holistic view and asking the executor it is only an particular view. |
state=State.unfinished(), | ||
session=session | ||
) | ||
none_depends_on_past = all(t.task.depends_on_past for t in unfinished_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be not t.task.depends_on_past
?
This addresses AIRFLOW-128.
@aoen @artwr @mistercrunch @r39132 @jlowin : ready for review.
Goals:
What has changed:
Stats:
Old:
New:
** Note: unittests have been added to cover process_dag **
Dag used for testing: