Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1641] Handle executor events in the scheduler #2715

Closed
wants to merge 1 commit into from

Conversation

bolkedebruin
Copy link
Contributor

Dear Airflow maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots of any UI changes:

While in Backfills we do handle the executor state,
we do not in the Scheduler. In case there is an unspecified
error (e.g. a timeout, airflow command failure) tasks
can get stuck.

Tests

  • [] My PR adds the following unit tests OR does not need testing for this extremely good reason:

Should be added.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

cc @saguziel @aoen @criccomini

@bolkedebruin bolkedebruin force-pushed the AIRFLOW-1641 branch 5 times, most recently from cd81845 to 561cc8e Compare October 23, 2017 10:22
@codecov-io
Copy link

codecov-io commented Oct 23, 2017

Codecov Report

Merging #2715 into master will increase coverage by 0.09%.
The diff coverage is 92.1%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2715      +/-   ##
==========================================
+ Coverage   72.37%   72.46%   +0.09%     
==========================================
  Files         154      154              
  Lines       11815    11836      +21     
==========================================
+ Hits         8551     8577      +26     
+ Misses       3264     3259       -5
Impacted Files Coverage Δ
airflow/executors/base_executor.py 96.77% <100%> (+0.34%) ⬆️
airflow/utils/dag_processing.py 88.46% <100%> (+2.26%) ⬆️
airflow/jobs.py 79.22% <88.88%> (+0.16%) ⬆️
airflow/models.py 87.14% <0%> (+0.04%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f271d43...4ec5df4. Read the comment docs.

@bolkedebruin
Copy link
Contributor Author

cc @andylockran @JohnCulver ready for testing

return d
else:
d = {}
for key, state in list(self.event_buffer.items()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cast to list here? Is this some python 3 thing?

Copy link
Contributor Author

@bolkedebruin bolkedebruin Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (python 2 actually afaik), but more importantly I am changing event_buffer in this loop, copying it into a list makes sure not to skip any events.

ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will alerts be sent? If there are no callbacks or retries, and no alerts are sent either, this is going to cause operational issues for folks, I suspect. We depend heavily on alerting messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if it cannot load the DAG. Basically if DAG loading fails there is no way where to know where to send that email to. That information is unavailable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there nothing better we can do here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share this concern (see note above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do a suggestion. I don't like it either, but to have a task instance manage its own failure it needs the associated task. Simple_dag cannot and does not hold the operators (pickle errors), so it is required to get it from the normal DagBag.

I very open to suggestions, but I have been pursuing different scenarios already

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost feels like the alert should be sent to the admin of airflow rather than the task owner, since the appropriate action involves a config change (or machine change) to increase capacity or timeout limit on DAG imports. Is this possible in some palatable way? I don't think we have this concept right now in Airflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, hence the message in the logs. I mean that is in the end what logging is for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, but where do these logs get piped to? Is it the airflow scheduler logs, or is it the DAG-specific logs? Monitoring dag-specific logs for "error" is way too chatty since people break their DAGs all the time. If it scheduler, should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha.

airflow/jobs.py Outdated
continue

if ti.state == State.RUNNING or ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably shouldn't handle it in the running case in case this failure does not correspond to this try for some reason. The state should get unset from running anyway because of heartbeats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is tied to the try. The executor reports on the task_instance which does have the try associated with it. Please note that this exactly the same code as we use for backfills. Basically you want to fail also when running, because the worker is returning and does not manage the task anymore if it is still in RUNNING.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases where it fails, then is run manually again, then the executor inspects, we will then handle_failure which is probably not ideal. If anything, in the running state, the heartbeat logic should handle the failure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @saguziel : I see what you mean. There is a small change that the scheduler has scheduled a new task instance with the same key and that instance is running now. While I do think that goes outside the scope of this PR (or maybe not?), we could guard against this by not allowing task_instances known to the executor (i.e. queued_tasks, running, and new: event_buffer) to be executed again or check against the start_date (the 'try' im really not enthusiastic about). Checking against the start_date would require more in-depth changes (start_date should be part of the key) while maybe speeding the scheduler up by a little bit (at best a heartbeat)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saguziel (async comm and no threads):

In cases where it fails, then is run manually again, then the executor inspects, we will then handle_failure which is probably not ideal. If anything, in the running state, the heartbeat logic should handle the failure

I don't follow. You mean a kind of race condition? You would need to reset the state first. The task will not start when it finds itself to be in RUNNING

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: 2nd comment:
If the TI is in the running state, the task is either running or not running.
If it is running, we should not interrupt it by calling handle failure on the scheduler.
If it is not, the scheduler heartbeat logic will take care of it.

The executor event in the case when the TI is actually running should able to be discarded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure if I agree. If the monitor (=worker/executor) reports a failure or success why should we discard that?

I don't have the heartbeat logic entirely in my mind but what happens if the executor reports SUCCESS and the task stays in RUNNING?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the monitor reports a failure, it is subject to arbitrary delays. So the failure was definitely in the past. If a TI is in RUNNING, we know it is running now (or will be reaped soon), and this is a tested path of Airflow, so we're relatively certain it was the only task running at the time.

If a task stays in RUNNING but is not actually running, it will get set to failure after 2* heartbeat.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been following this thread. I'm wondering if this logic to check for stuck queued TIs from timeout should be in the kill_zombies method instead of here, then we can let scheduler heartbeat takes care of all the stale tasks, whether they are said to be running or queued.

I'm not super familiar with the scheduler side of things, but given that all _process_executor_events did in the past was logging, it seems to suggest that the executor events are processed somewhere else. May be more clear to rename it to _log_executor_events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgao54 it can’t be in kill_zombies as there is nothing to kill. The executor actually works as a kind of monitor on the OS level.

Backfills are managing the executor state and we should have done it here as well.

@criccomini
Copy link
Contributor

LGTM but will defer to @saguziel for a final signoff

d = self.event_buffer
self.event_buffer = {}
return d
if not dag_ids:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this if dag_ids is None:?

The risk of someone programatically passing in an empty list and clearing everything instead of nothing seems like something we want to avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

self.event_buffer = {}
return d
if not dag_ids:
d = self.event_buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're at it, can we use a more descriptive variable name than d? How about cleared_events = self.event_buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -141,13 +141,27 @@ def fail(self, key):
def success(self, key):
self.change_state(key, State.SUCCESS)

def get_event_buffer(self):
def get_event_buffer(self, dag_ids=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I suggest a slight refactor to remove some duplication and use the dict.pop return value:

cleared_events = {}
if dag_ids is None:
    cleared_events = self.event_buffer
    self.event_buffer = {}
else:
    for key in list(previous_buffer):
        dag_id, _, _ = key
        if dag_id in dag_ids:
            cleared_events[key] = self.event_buffer.pop(key)
return cleared_events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will, will slightly change yours

dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be more specific about the errors we catch here than a blanket except Exception:?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would want to catch any exception here, to make sure we do not let any "QUEUED" task_instances linger.

airflow/jobs.py Outdated
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
self.log.error(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this self.log.error("...", ti, state, ti.state) instead of self.log.error(msg) to be consistent with the other logging and to take advantage of Python's lazy log processing?

Copy link
Contributor Author

@bolkedebruin bolkedebruin Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using it twice (see handle_failure below), so not in this case I guess

While in Backfills we do handle the executor state,
we do not in the Scheduler. In case there is an unspecified
error (e.g. a timeout, airflow command failure) tasks
can get stuck.
@bolkedebruin
Copy link
Contributor Author

@gwax addressed your comments

@saguziel I updated the PR to only fail QUEUED tasks. I'm not convinced we should ignore RUNNING, as it is inconsistent with Backfills and I don't like that we are ignoring executor state here which functions as a monitor. But if it takes that to have it pass for now lets do that.

@saguziel
Copy link
Contributor

lgtm

@saguziel
Copy link
Contributor

The problem is that executor ends up not being consistent as it should, and we et state in other places typically.

@bolkedebruin
Copy link
Contributor Author

@saguziel lets discuss that f2f soon

@asfgit asfgit closed this in 2abead7 Oct 27, 2017
asfgit pushed a commit that referenced this pull request Oct 27, 2017
While in Backfills we do handle the executor
state,
we do not in the Scheduler. In case there is an
unspecified
error (e.g. a timeout, airflow command failure)
tasks
can get stuck.

Closes #2715 from bolkedebruin/AIRFLOW-1641

(cherry picked from commit 2abead7)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>
mrares pushed a commit to mrares/incubator-airflow that referenced this pull request Apr 23, 2018
While in Backfills we do handle the executor
state,
we do not in the Scheduler. In case there is an
unspecified
error (e.g. a timeout, airflow command failure)
tasks
can get stuck.

Closes apache#2715 from bolkedebruin/AIRFLOW-1641

(cherry picked from commit 2abead7)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants