Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update retcodes to handle new cases #1771

Merged
merged 10 commits into from
Jul 20, 2016
8 changes: 7 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -481,18 +481,24 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file:
missing_data=20
task_failed=30
scheduling_error=35
unknown_reason=38 or 0
unhandled_exception=40

unhandled_exception
For internal Luigi errors. Defaults to 4, since this type of error
probably will not recover over time.
unknown_reason
For when a task does not run successfully because of an unknown reason. Despite
this case can be expected in an error free execution, it does not guarantee
completeness of the root task. Return code 38 is advised only for cases in which
Luigi return code 0 is used by the user to guarantee root tasks' completeness.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed the comment suggesting "38 or 0". The comment doesn't describe the benefit. I would -1 this convoluted wording as overcomplication.

0 is already the default value. The existing users who perform retcode configuration won't be taken by surprise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, lets not complicate things. The benefit I was looking for was to say that this return code is "no problem, don't worry".

What about using a number but a much lower number? I noticed we already say they are in increasing order of severity (I had forgotten this). Maybe use the number 15?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I was going to suggest 25. (As said, for a human operator, "not run for unknown reason" IMO evaluates worse than "not run for <insert known reason, like missing data>".)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I imagined 10 < x < 30. So 25 sounds good because of the reason you said.

missing_data
For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this
caused the worker to give up. As an alternative to fiddling with this, see
the [worker] keep_alive option.
scheduling_error
For when a task's ``complete()`` or ``requires()`` method fails with an
exception.
exception, or when the limit number of tasks is reached.
task_failed
For signaling that there were last known to have failed. Typically because
some exception have been raised.
Expand Down
20 changes: 15 additions & 5 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def _partition_tasks(worker):
return set_tasks


def _root_task(worker):
"""
Return the first task scheduled by the worker, corresponding to the root task
"""
return worker._add_task_history[0][0]


def _populate_unknown_statuses(set_tasks):
"""
Add the "upstream_*" and "unknown_reason" statuses my mutating set_tasks.
Expand Down Expand Up @@ -270,15 +277,15 @@ def _get_comments(group_tasks):
("already_done", 'present dependencies were encountered'),
("completed", 'ran successfully'),
("failed", 'failed'),
("scheduling_error", 'failed running complete() or requires()'),
("scheduling_error", 'failed scheduling'),
("still_pending", 'were left pending, among these'),
("still_pending_ext", 'were missing external dependencies'),
("run_by_other_worker", 'were being run by another worker'),
("upstream_failure", 'had failed dependencies'),
("upstream_missing_dependency", 'had missing external dependencies'),
("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'),
("upstream_scheduling_error", 'had dependencies whose complete() or requires() failed'),
("unknown_reason", 'were left pending because of unknown reason'),
("upstream_scheduling_error", 'had dependencies whose scheduling failed'),
("unknown_reason", 'did not run successfully because of unknown reason'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the task fails run() we do get the reason back, as the exception, right? Was there a reason to alter wording "were left pending"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, it stems from this part of discussion. Thinking about the cases of disabling and resources missing, it's still true that the instance remained "pending". (In the common sense common denominator of those more detailed statuses. If we invested more in status propagation we would come up with summaries like "pending because missing resources" etc.)

I'm saying that in my view the higher level conceptual task's flow hasn't changed much and doesn't risk changing much. It's first pending, then becomes running (the moment when run() is invoked). The reason for failing running is always "known" via the exception; the reason for not running at all can vary, but "left pending" remains valid wording.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, a valid wording would be "did not run", because that's equivalent to the common sense "left pending" (didn't enter run()). "Did not run successfully" is what I object against, because that can also be interpreted to mean that it did enter run().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're spot on Uldis. Let's say "did not run" or "wasnt permitted to run" (to remind users that who decides this is actually the central scheduler).

))


Expand Down Expand Up @@ -374,10 +381,13 @@ def _summary_format(set_tasks, worker):
smiley = ":("
reason = "there were failed tasks"
if set_tasks["scheduling_error"]:
reason += " and tasks whose complete() or requires() failed"
reason += " and tasks whose scheduling failed"
elif set_tasks["scheduling_error"]:
smiley = ":("
reason = "there were tasks whose complete() or requires() failed"
reason = "there were tasks whose scheduling failed"
elif set_tasks["unknown_reason"]:
smiley = ":|"
reason = "there were tasks that did not run successfully because of unknown reason"
elif set_tasks["still_pending_ext"]:
smiley = ":|"
reason = "there were missing external dependencies"
Expand Down
18 changes: 16 additions & 2 deletions luigi/retcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ class retcode(luigi.Config):
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
scheduling_error = IntParameter(default=0,
description='''For when a task's complete() or requires() fails.'''
description='''For when a task's complete() or requires() fails,
or task-limit reached'''
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
unknown_reason = IntParameter(default=0,
description="For when a task does not run successfully because of unknown reason."
)


def run_with_retcodes(argv):
Expand All @@ -76,6 +81,7 @@ def run_with_retcodes(argv):
sys.exit(retcodes.unhandled_exception)

task_sets = luigi.execution_summary._summary_dict(worker)
root_task = luigi.execution_summary._root_task(worker)
non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys()

def has(status):
Expand All @@ -87,5 +93,13 @@ def has(status):
(retcodes.task_failed, has('failed')),
(retcodes.already_running, has('run_by_other_worker')),
(retcodes.scheduling_error, has('scheduling_error')),
(retcodes.unknown_reason, has('unknown_reason')),
)
sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds))
expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds)

if expected_ret_code == 0 and \
root_task not in task_sets["completed"] and \
root_task not in task_sets["already_done"]:
sys.exit(retcodes.unknown_reason)
else:
sys.exit(expected_ret_code)
100 changes: 51 additions & 49 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,70 +570,72 @@ def add(self, task, multiprocess=False):

def _add(self, task, is_complete):
if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit:
logger.warning('Will not schedule %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit)
return

formatted_traceback = None
try:
self._check_complete_value(is_complete)
except KeyboardInterrupt:
raise
except AsyncCompletionException as ex:
formatted_traceback = ex.trace
except BaseException:
formatted_traceback = traceback.format_exc()

if formatted_traceback is not None:
self.add_succeeded = False
self._log_complete_error(task, formatted_traceback)
task.trigger_event(Event.DEPENDENCY_MISSING, task)
self._email_complete_error(task, formatted_traceback)
logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit)
deps = None
status = UNKNOWN
runnable = False

elif is_complete:
deps = None
status = DONE
runnable = False

task.trigger_event(Event.DEPENDENCY_PRESENT, task)
elif _is_external(task):
deps = None
status = PENDING
runnable = worker().retry_external_tasks

task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data depedency, so it can not be run from'
' this luigi process.', task)

else:
formatted_traceback = None
try:
deps = task.deps()
except Exception as ex:
self._check_complete_value(is_complete)
except KeyboardInterrupt:
raise
except AsyncCompletionException as ex:
formatted_traceback = ex.trace
except BaseException:
formatted_traceback = traceback.format_exc()

if formatted_traceback is not None:
self.add_succeeded = False
self._log_dependency_error(task, formatted_traceback)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_dependency_error(task, formatted_traceback)
self._log_complete_error(task, formatted_traceback)
task.trigger_event(Event.DEPENDENCY_MISSING, task)
self._email_complete_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
else:

elif is_complete:
deps = None
status = DONE
runnable = False
task.trigger_event(Event.DEPENDENCY_PRESENT, task)

elif _is_external(task):
deps = None
status = PENDING
runnable = True
runnable = worker().retry_external_tasks
task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data depedency, so it can not be run from'
' this luigi process.', task)

else:
try:
deps = task.deps()
except Exception as ex:
formatted_traceback = traceback.format_exc()
self.add_succeeded = False
self._log_dependency_error(task, formatted_traceback)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_dependency_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
else:
status = PENDING
runnable = True

if task.disabled:
status = DISABLED
if task.disabled:
status = DISABLED

if deps:
for d in deps:
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add
if deps:
for d in deps:
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add

deps = [d.task_id for d in deps]
deps = [d.task_id for d in deps]

self._scheduled_tasks[task.task_id] = task
self._add_task(worker=self._id, task_id=task.task_id, status=status,
Expand Down
60 changes: 54 additions & 6 deletions test/execution_summary_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,62 @@ def requires(self):
'===== Luigi Execution Summary =====',
'',
'Scheduled 2 tasks of which:',
'* 1 failed running complete() or requires():',
'* 1 failed scheduling:',
' - 1 Bar()',
'* 1 were left pending, among these:',
" * 1 had dependencies whose complete() or requires() failed:",
" * 1 had dependencies whose scheduling failed:",
' - 1 Foo()',
'',
'Did not run any tasks',
'This progress looks :( because there were tasks whose complete() or requires() failed',
'This progress looks :( because there were tasks whose scheduling failed',
'',
'===== Luigi Execution Summary =====',
'']
result = summary.split('\n')
self.assertEqual(len(result), len(expected))
for i, line in enumerate(result):
self.assertEqual(line, expected[i])

def test_unknown_reason_error(self):
class Bar(luigi.Task):
def complete(self):
return True

class Foo(luigi.Task):
def requires(self):
yield Bar()

def new_func(*args, **kwargs):
return None

with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func):
self.run_task(Foo())

d = self.summary_dict()
self.assertEqual({Foo()}, d['still_pending_not_ext'])
self.assertEqual({Foo()}, d['unknown_reason'])
self.assertEqual({Bar()}, d['already_done'])
self.assertFalse(d['upstream_scheduling_error'])
self.assertFalse(d['scheduling_error'])
self.assertFalse(d['completed'])
self.assertFalse(d['failed'])
self.assertFalse(d['upstream_failure'])
self.assertFalse(d['upstream_missing_dependency'])
self.assertFalse(d['run_by_other_worker'])
self.assertFalse(d['still_pending_ext'])
summary = self.summary()
expected = ['',
'===== Luigi Execution Summary =====',
'',
'Scheduled 2 tasks of which:',
'* 1 present dependencies were encountered:',
' - 1 Bar()',
'* 1 were left pending, among these:',
" * 1 did not run successfully because of unknown reason:",
' - 1 Foo()',
'',
'Did not run any tasks',
'This progress looks :| because there were tasks that did not run successfully because of unknown reason',
'',
'===== Luigi Execution Summary =====',
'']
Expand Down Expand Up @@ -172,11 +220,11 @@ def requires(self):
'===== Luigi Execution Summary =====',
'',
'Scheduled 1 tasks of which:',
'* 1 failed running complete() or requires():',
'* 1 failed scheduling:',
' - 1 Foo()',
'',
'Did not run any tasks',
'This progress looks :( because there were tasks whose complete() or requires() failed',
'This progress looks :( because there were tasks whose scheduling failed',
'',
'===== Luigi Execution Summary =====',
'']
Expand Down Expand Up @@ -380,7 +428,7 @@ def new_func(*args, **kwargs):
s = self.summary()
self.assertIn('\nScheduled 1 tasks of which:\n'
'* 1 were left pending, among these:\n'
' * 1 were left pending because of unknown reason:\n'
' * 1 did not run successfully because of unknown reason:\n'
' - 1 AlreadyRunningTask()\n', s)
self.assertNotIn('\n\n\n', s)

Expand Down
33 changes: 33 additions & 0 deletions test/retcodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ def requires(self):

self.run_and_expect('RequiringTask', 4)

def test_task_limit(self):
class TaskB(luigi.Task):
def complete(self):
return False

class TaskA(luigi.Task):
def requires(sefl):
yield TaskB()

class TaskLimitTest(luigi.Task):
def requires(self):
yield TaskA()

self.run_and_expect('TaskLimitTest --worker-task-limit 2', 0)
self.run_and_expect('TaskLimitTest --worker-task-limit 2 --retcode-scheduling-error 3', 3)

def test_unhandled_exception(self):
def new_func(*args, **kwargs):
raise Exception()
Expand Down Expand Up @@ -137,3 +153,20 @@ def requires(self):

self.run_and_expect('RequiringTask --retcode-task-failed 4 --retcode-missing-data 5', 5)
self.run_and_expect('RequiringTask --retcode-task-failed 7 --retcode-missing-data 6', 7)

def test_unknown_reason(self):

class TaskA(luigi.Task):
def complete(self):
return True

class RequiringTask(luigi.Task):
def requires(self):
yield TaskA()

def new_func(*args, **kwargs):
return None

with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func):
self.run_and_expect('RequiringTask', 0)
self.run_and_expect('RequiringTask --retcode-unknown-reason 5', 5)