From fc23b46396e81e6e482e4b8cef3cdb21f8fd49ef Mon Sep 17 00:00:00 2001 From: jlowin Date: Fri, 25 Mar 2016 21:56:38 -0400 Subject: [PATCH 1/2] Make sure Executors properly trap errors SequentialExecutor and LocalExecutor execute `airflow run` commands with `subprocess.Popen().wait()` and try to catch errors with `try/except`. However, `subprocess.Popen()` doesn't raise errors; you have to check the `returncode`. As a result, these Executors always report that their commands are successful. This is normally fine because task status gets precedence over executor status, so as long as the task reports on itself correctly the issue is avoided. But if an error is raised BEFORE a task runs -- meaning the task is not yet monitoring its own status -- then the executor will incorrectly report success. Airflow will actually notice something went wrong, but because the task doesn't say it failed, it gets rescheduled, leading to an infinite loop. To resolve this, replace the Executor's `Popen().wait()` with `check_call()`, which is a blocking method that raises an error if the returncode != 0. This way, errors are properly recognized. Also, prevent infinite loops by limiting the number of times a task is allowed to be rescheduled due to executor failure to 3. (Note: this doesn't affect the number of times a task can be rescheduled due to its own failure). Last, check for an odd situation where the executor reports failure but the task reports running. Closes #1199 See #1220 for a test case --- airflow/executors/local_executor.py | 6 ++--- airflow/executors/sequential_executor.py | 11 ++++---- airflow/jobs.py | 34 +++++++++++++++++++++--- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index c68c28c267191..19ada6a799946 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -29,11 +29,11 @@ def run(self): self.__class__.__name__, command)) command = "exec bash -c '{0}'".format(command) try: - subprocess.Popen(command, shell=True).wait() + subprocess.check_call(command, shell=True) state = State.SUCCESS - except Exception as e: + except subprocess.CalledProcessError as e: state = State.FAILED - self.logger.error("failed to execute task {}:".format(str(e))) + self.logger.error("Failed to execute task {}:".format(str(e))) # raise e self.result_queue.put((key, state)) self.task_queue.task_done() diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index f56d51b7b5419..4684226a1aa5f 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -25,13 +25,14 @@ def execute_async(self, key, command, queue=None): def sync(self): for key, command in self.commands_to_run: self.logger.info("Executing command: {}".format(command)) + try: - sp = subprocess.Popen(command, shell=True) - sp.wait() - except Exception as e: + subprocess.check_call(command, shell=True) + self.change_state(key, State.SUCCESS) + except subprocess.CalledProcessError as e: self.change_state(key, State.FAILED) - raise e - self.change_state(key, State.SUCCESS) + self.logger.error("Failed to execute task {}:".format(str(e))) + self.commands_to_run = [] def end(self): diff --git a/airflow/jobs.py b/airflow/jobs.py index 1206a6b8f78c3..5a939ed072d41 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -19,7 +19,7 @@ from builtins import str from past.builtins import basestring -from collections import defaultdict +from collections import defaultdict, Counter from datetime import datetime from itertools import product import getpass @@ -734,6 +734,7 @@ def _execute(self): executor = self.executor executor.start() + executor_fails = Counter() # Build a list of all instances to run tasks_to_run = {} @@ -785,12 +786,21 @@ def _execute(self): if ( ti.state in (State.FAILED, State.SKIPPED) or state == State.FAILED): - if ti.state == State.FAILED or state == State.FAILED: + # executor reports failure; task reports running + if ti.state == State.RUNNING and state == State.FAILED: + msg = ( + 'Executor reports that task instance {} failed ' + 'although the task says it is running.'.format(key)) + self.logger.error(msg) + ti.handle_failure(msg) + # executor and task report failure + elif ti.state == State.FAILED or state == State.FAILED: failed.append(key) - self.logger.error("Task instance " + str(key) + " failed") + self.logger.error("Task instance {} failed".format(key)) + # task reports skipped elif ti.state == State.SKIPPED: wont_run.append(key) - self.logger.error("Skipping " + str(key) + " failed") + self.logger.error("Skipping {} ".format(key)) tasks_to_run.pop(key) # Removing downstream tasks that also shouldn't run for t in self.dag.get_task(task_id).get_flat_relatives( @@ -799,9 +809,11 @@ def _execute(self): if key in tasks_to_run: wont_run.append(key) tasks_to_run.pop(key) + # executor and task report success elif ti.state == State.SUCCESS and state == State.SUCCESS: succeeded.append(key) tasks_to_run.pop(key) + # executor reports success but task does not -- this is weird elif ( ti.state not in (State.SUCCESS, State.QUEUED) and state == State.SUCCESS): @@ -812,6 +824,20 @@ def _execute(self): "reported state is '{}'. TI is {}" "".format(ti.state, state, ti)) + # if the executor fails 3 or more times, stop trying to + # run the task + executor_fails[key] += 1 + if executor_fails[key] >= 3: + msg = ( + 'The airflow run command failed to report an ' + 'error for task {} three or more times. The task ' + 'is being marked as failed. This is very unusual ' + 'and probably means that an error is taking place ' + 'before the task even starts.'.format(key)) + self.logger.error(msg) + ti.handle_failure(msg) + tasks_to_run.pop(key) + msg = ( "[backfill progress] " "waiting: {0} | " From f0eeb151106f9b179f8d8ea27f3fc0066b968404 Mon Sep 17 00:00:00 2001 From: jlowin Date: Sat, 26 Mar 2016 10:55:00 -0400 Subject: [PATCH 2/2] Add unit tests for trapping Executor errors --- tests/core.py | 53 +++++++++++++++++++++++++ tests/dags/test_raise_executor_error.py | 49 +++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/dags/test_raise_executor_error.py diff --git a/tests/core.py b/tests/core.py index 6f36c207cd5f2..8fb29f12744d8 100644 --- a/tests/core.py +++ b/tests/core.py @@ -12,6 +12,7 @@ from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication import errno +import signal from time import sleep from dateutil.relativedelta import relativedelta @@ -36,6 +37,8 @@ NUM_EXAMPLE_DAGS = 14 DEV_NULL = '/dev/null' +TEST_DAG_FOLDER = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'dags') DEFAULT_DATE = datetime(2015, 1, 1) DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] @@ -49,6 +52,29 @@ import pickle +class timeout: + """ + A context manager used to limit execution time. + + Note -- won't work on Windows (based on signal, like Airflow timeouts) + + Based on: http://stackoverflow.com/a/22348885 + """ + def __init__(self, seconds=1, error_message='Timeout'): + self.seconds = seconds + self.error_message = error_message + + def handle_timeout(self, signum, frame): + raise ValueError(self.error_message) + + def __enter__(self): + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.alarm(self.seconds) + + def __exit__(self, type, value, traceback): + signal.alarm(0) + + class FakeDatetime(datetime): "A fake replacement for datetime that can be mocked for testing." @@ -245,6 +271,33 @@ def test_backfill_examples(self): end_date=DEFAULT_DATE) job.run() + def test_trap_executor_error(self): + """ + Test for https://github.com/airbnb/airflow/pull/1220 + + Test that errors setting up tasks (before tasks run) are properly + caught + """ + self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER) + dags = [ + dag for dag in self.dagbag.dags.values() + if dag.dag_id in ('test_raise_executor_error',)] + for dag in dags: + dag.clear( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE) + for dag in dags: + job = jobs.BackfillJob( + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE) + # run with timeout because this creates an infinite loop if not + # caught + def run_with_timeout(): + with timeout(seconds=15): + job.run() + self.assertRaises(AirflowException, run_with_timeout) + def test_pickling(self): dp = self.dag.pickle() assert self.dag.dag_id == dp.pickle.dag_id diff --git a/tests/dags/test_raise_executor_error.py b/tests/dags/test_raise_executor_error.py new file mode 100644 index 0000000000000..c0138edd064d7 --- /dev/null +++ b/tests/dags/test_raise_executor_error.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +DAG designed to test what happens when running a DAG fails before +a task runs -- prior to a fix, this could actually cause an Executor to report +SUCCESS. Since the task never reports any status, this can lead to an infinite +rescheduling loop. +""" +from datetime import datetime + +from airflow.models import DAG +from airflow.operators import SubDagOperator +from airflow.example_dags.subdags.subdag import subdag + +args = { + 'owner': 'airflow', + 'start_date': datetime(2016, 1, 1), +} + +dag = DAG( + dag_id='test_raise_executor_error', + default_args=args, + schedule_interval="@daily", +) + +section_1 = SubDagOperator( + task_id='subdag_op', + subdag=subdag('test_raise_executor_error', 'subdag_op', args), + default_args=args, + dag=dag, +) + +# change the subdag name -- this creates an error because the subdag +# won't be found, but it'll do it in a way that causes the executor to report +# success +section_1.subdag.dag_id = 'bad_id'