Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure backfill deadlocks raise errors #1290

Merged
merged 3 commits into from
Apr 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def process_subdir(subdir):
def get_dag(args):
dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags:
raise AirflowException('dag_id could not be found')
raise AirflowException(
'dag_id could not be found: {}'.format(args.dag_id))
return dagbag.dags[args.dag_id]


Expand Down
13 changes: 12 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def run_command(command):
TEST_CONFIG = """\
[core]
airflow_home = {AIRFLOW_HOME}
dags_folder = {AIRFLOW_HOME}/dags
dags_folder = {TEST_DAGS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
executor = SequentialExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
Expand Down Expand Up @@ -581,6 +581,17 @@ def mkdir_p(path):
else:
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')


def parameterized_config(template):
"""
Expand Down
167 changes: 99 additions & 68 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def process_dag(self, dag, executor):
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}

descartes = [obj for obj in product(dag.tasks, active_runs)]
could_not_run = set()
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
for task, dttm in descartes:
Expand All @@ -513,6 +514,23 @@ def process_dag(self, dag, executor):
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Firing task: {}'.format(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
else:
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
# the TI's haven't been persisted to the database.
if len(could_not_run) == len(descartes):
self.logger.error(
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
(session
.query(models.DagRun)
.filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == State.RUNNING,
models.DagRun.execution_date.in_(active_runs))
.update(
{models.DagRun.state: State.FAILED},
synchronize_session='fetch'))

# Releasing the lock
self.logger.debug("Unlocking DAG (scheduler_lock)")
Expand Down Expand Up @@ -553,8 +571,6 @@ def process_events(self, executor, dagbag):
# collect queued tasks for prioritiztion
if ti.state == State.QUEUED:
self.queued_tis.add(ti)
elif ti in self.queued_tis:
self.queued_tis.remove(ti)
else:
# special instructions for failed executions could go here
pass
Expand Down Expand Up @@ -583,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag):
else:
d[ti.pool].append(ti)

self.queued_tis.clear()

dag_blacklist = set(dagbag.paused_dags())
for pool, tis in list(d.items()):
if not pool:
Expand Down Expand Up @@ -781,11 +799,12 @@ def _execute(self):

# Build a list of all instances to run
tasks_to_run = {}
failed = []
succeeded = []
started = []
wont_run = []
not_ready_to_run = set()
failed = set()
succeeded = set()
started = set()
skipped = set()
not_ready = set()
deadlocked = set()

for task in self.dag.tasks:
if (not self.include_adhoc) and task.adhoc:
Expand All @@ -800,67 +819,56 @@ def _execute(self):
session.commit()

# Triggering what is ready to get triggered
deadlocked = False
while tasks_to_run and not deadlocked:

not_ready.clear()
for key, ti in list(tasks_to_run.items()):

ti.refresh_from_db()
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))

# Did the task finish without failing? -- then we're done
if (
ti.state in (State.SUCCESS, State.SKIPPED) and
key in tasks_to_run):
succeeded.append(key)
tasks_to_run.pop(key)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if key not in started:
if ti.state == State.SUCCESS:
succeeded.add(key)
tasks_to_run.pop(key)
continue
elif ti.state == State.SKIPPED:
skipped.add(key)
tasks_to_run.pop(key)
continue

# Is the task runnable? -- the run it
elif ti.is_queueable(
# Is the task runnable? -- then run it
if ti.is_queueable(
include_queued=True,
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
self.logger.debug('Sending {} to executor'.format(ti))
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_dependencies=self.ignore_dependencies,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
ti.state = State.RUNNING
if key not in started:
started.append(key)
if ti in not_ready_to_run:
not_ready_to_run.remove(ti)

# Mark the task as not ready to run. If the set of tasks
# that aren't ready ever equals the set of tasks to run,
# then the backfill is deadlocked
started.add(key)

# Mark the task as not ready to run
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready_to_run.add(ti)
if not_ready_to_run == set(tasks_to_run.values()):
msg = 'BackfillJob is deadlocked: no tasks can be run.'
if any(
t.are_dependencies_met() !=
t.are_dependencies_met(
ignore_depends_on_past=True)
for t in tasks_to_run.values()):
msg += (
' Some of the tasks that were unable to '
'run have "depends_on_past=True". Try running '
'the backfill with the option '
'"ignore_first_depends_on_past=True" '
' or passing "-I" at the command line.')
self.logger.error(msg)
deadlocked = True
wont_run.extend(not_ready_to_run)
tasks_to_run.clear()
self.logger.debug('Added {} to not_ready'.format(ti))
not_ready.add(key)

self.heartbeat()
executor.heartbeat()

# If the set of tasks that aren't ready ever equals the set of
# tasks to run, then the backfill is deadlocked
if not_ready and not_ready == set(tasks_to_run):
deadlocked.update(tasks_to_run.values())
tasks_to_run.clear()

# Reacting to events
for key, state in list(executor.get_event_buffer().items()):
dag_id, task_id, execution_date = key
Expand All @@ -882,12 +890,12 @@ def _execute(self):

# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
skipped.add(key)
self.logger.error("Skipping {} ".format(key))

# anything else is a failure
else:
failed.append(key)
failed.add(key)
self.logger.error("Task instance {} failed".format(key))

tasks_to_run.pop(key)
Expand All @@ -899,18 +907,19 @@ def _execute(self):
if ti.state == State.SUCCESS:
self.logger.info(
'Task instance {} succeeded'.format(key))
succeeded.append(key)
succeeded.add(key)
tasks_to_run.pop(key)

# task reports failure
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(key))
failed.append(key)
failed.add(key)
tasks_to_run.pop(key)

# this probably won't ever be triggered
elif key in not_ready_to_run:
continue
elif ti in not_ready:
self.logger.info(
"{} wasn't expected to run, but it did".format(ti))

# executor reports success but task does not - this is weird
elif ti.state not in (
Expand Down Expand Up @@ -939,29 +948,51 @@ def _execute(self):
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
"waiting: {0} | "
"succeeded: {1} | "
"kicked_off: {2} | "
"failed: {3} | "
"wont_run: {4} ").format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(wont_run))
msg = ' | '.join([
"[backfill progress]",
"waiting: {0}",
"succeeded: {1}",
"kicked_off: {2}",
"failed: {3}",
"skipped: {4}",
"deadlocked: {5}"
]).format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(skipped),
len(deadlocked))
self.logger.info(msg)

executor.end()
session.close()

err = ''
if failed:
msg = (
"------------------------------------------\n"
"Some tasks instances failed, "
"here's the list:\n{}".format(failed))
raise AirflowException(msg)
self.logger.info("All done. Exiting.")
err += (
"---------------------------------------------------\n"
"Some task instances failed:\n{}\n".format(failed))
if deadlocked:
err += (
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
deadlocked_depends_on_past = any(
t.are_dependencies_met() != t.are_dependencies_met(
ignore_depends_on_past=True)
for t in deadlocked)
if deadlocked_depends_on_past:
err += (
'Some of the deadlocked tasks were unable to run because '
'of "depends_on_past" relationships. Try running the '
'backfill with the option '
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
err += ' These tasks were unable to run:\n{}\n'.format(deadlocked)
if err:
raise AirflowException(err)

self.logger.info("Backfill done. Exiting.")


class LocalTaskJob(BaseJob):
Expand Down
3 changes: 2 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,8 @@ def get_active_runs(self):
# AND there are unfinished tasks...
any(ti.state in State.unfinished() for ti in task_instances) and
# AND none of them have dependencies met...
all(not ti.are_dependencies_met() for ti in task_instances
all(not ti.are_dependencies_met(session=session)
for ti in task_instances
if ti.state in State.unfinished()))

for run in active_runs:
Expand Down
18 changes: 18 additions & 0 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,26 @@ def runnable(cls):
cls.QUEUED
]

@classmethod
def finished(cls):
"""
A list of states indicating that a task started and completed a
run attempt. Note that the attempt could have resulted in failure or
have been interrupted; in any case, it is no longer running.
"""
return [
cls.SUCCESS,
cls.SHUTDOWN,
cls.FAILED,
cls.SKIPPED,
]

@classmethod
def unfinished(cls):
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""
return [
cls.NONE,
cls.QUEUED,
Expand Down
22 changes: 0 additions & 22 deletions airflow/utils/tests.py

This file was deleted.

3 changes: 3 additions & 0 deletions scripts/ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then
echo "Using travis airflow.cfg"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg

ROOTDIR="$(dirname $(dirname $DIR))"
export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags"
fi

echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
Expand Down
2 changes: 1 addition & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
configuration.test_mode()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configuration.test_mode() is used through out core.py. Should these all be replaced?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolkedebruin I deleted this one because it was already called on line 22 of tests/core.py -- but to be completely honest, I'm not sure if configuration.test_mode() actually does anything?

This is the entire function:

def test_mode():
    conf = ConfigParserWithDefaults(defaults)
    conf.read(TEST_CONFIG)

It creates a new conf object with the test parameters, but doesn't return it or overwrite the existing one. And it doesn't change values that were already read-in at the module level (like DAGS_FOLDER at the top of models.py)


try:
import cPickle as pickle
Expand Down
10 changes: 10 additions & 0 deletions tests/dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Unit Tests DAGs Folder

This folder contains DAGs for Airflow unit testing.

To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence.

```python
dagbag = DagBag()
dag = dagbag.get(dag_id)
```
Loading