Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update retcodes to handle new cases #1771

Merged
merged 10 commits into from
Jul 20, 2016
31 changes: 20 additions & 11 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -479,28 +479,37 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file:
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

unhandled_exception
For internal Luigi errors. Defaults to 4, since this type of error
probably will not recover over time.
already_running
This can happen in two different cases. Either the local lock file was taken
at the time the invocation starts up. Or, the central scheduler have reported
that some tasks could not have been run, because other workers are already
running the tasks.
missing_data
For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this
caused the worker to give up. As an alternative to fiddling with this, see
the [worker] keep_alive option.
scheduling_error
For when a task's ``complete()`` or ``requires()`` method fails with an
exception.
not_run
For when a task is not granted run permission by the scheduler. Typically
because of lack of resources, because the task has been already run by
another worker or because the attempted task is in DISABLED state.
Connectivity issues with the central scheduler might also cause this.
This does not include the cases for which a run is not allowed due to missing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice.

Come to think about it, I find it illogical that already_running has a lower severity in this scheme than the not_run case where a task has already been run and succeeded. But I don't consider that a blocker now.

dependencies (missing_data) or due to the fact that another worker is currently
running the task (already_running).
task_failed
For signaling that there were last known to have failed. Typically because
some exception have been raised.
already_running
This can happen in two different cases. Either the local lock file was taken
at the time the invocation starts up. Or, the central scheduler have reported
that some tasks could not have been run, because other workers are already
running the tasks.
scheduling_error
For when a task's ``complete()`` or ``requires()`` method fails with an
exception, or when the limit number of tasks is reached.
unhandled_exception
For internal Luigi errors. Defaults to 4, since this type of error
probably will not recover over time.

If you customize return codes, prefer to set them in range 128 to 255 to avoid
conflicts. Return codes in range 0 to 127 are reserved for possible future use
Expand Down
28 changes: 19 additions & 9 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@ def _partition_tasks(worker):
set_tasks["upstream_missing_dependency"] = set()
set_tasks["upstream_run_by_other_worker"] = set()
set_tasks["upstream_scheduling_error"] = set()
set_tasks["unknown_reason"] = set()
set_tasks["not_run"] = set()
return set_tasks


def _root_task(worker):
"""
Return the first task scheduled by the worker, corresponding to the root task
"""
return worker._add_task_history[0][0]


def _populate_unknown_statuses(set_tasks):
"""
Add the "upstream_*" and "unknown_reason" statuses my mutating set_tasks.
Add the "upstream_*" and "not_run" statuses my mutating set_tasks.
"""
visited = set()
for task in set_tasks["still_pending_not_ext"]:
Expand Down Expand Up @@ -95,7 +102,7 @@ def _depth_first_search(set_tasks, current_task, visited):
if not upstream_failure and not upstream_missing_dependency and \
not upstream_run_by_other_worker and not upstream_scheduling_error and \
current_task not in set_tasks["run_by_other_worker"]:
set_tasks["unknown_reason"].add(current_task)
set_tasks["not_run"].add(current_task)


def _get_str(task_dict, extra_indent):
Expand Down Expand Up @@ -263,22 +270,22 @@ def _get_comments(group_tasks):
"upstream_missing_dependency",
"upstream_run_by_other_worker",
"upstream_scheduling_error",
"unknown_reason",
"not_run",
)
_PENDING_SUB_STATUSES = set(_ORDERED_STATUSES[_ORDERED_STATUSES.index("still_pending_ext"):])
_COMMENTS = set((
("already_done", 'present dependencies were encountered'),
("completed", 'ran successfully'),
("failed", 'failed'),
("scheduling_error", 'failed running complete() or requires()'),
("scheduling_error", 'failed scheduling'),
("still_pending", 'were left pending, among these'),
("still_pending_ext", 'were missing external dependencies'),
("run_by_other_worker", 'were being run by another worker'),
("upstream_failure", 'had failed dependencies'),
("upstream_missing_dependency", 'had missing external dependencies'),
("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'),
("upstream_scheduling_error", 'had dependencies whose complete() or requires() failed'),
("unknown_reason", 'were left pending because of unknown reason'),
("upstream_scheduling_error", 'had dependencies whose scheduling failed'),
("not_run", 'was not granted run permission by the scheduler'),
))


Expand Down Expand Up @@ -374,10 +381,13 @@ def _summary_format(set_tasks, worker):
smiley = ":("
reason = "there were failed tasks"
if set_tasks["scheduling_error"]:
reason += " and tasks whose complete() or requires() failed"
reason += " and tasks whose scheduling failed"
elif set_tasks["scheduling_error"]:
smiley = ":("
reason = "there were tasks whose complete() or requires() failed"
reason = "there were tasks whose scheduling failed"
elif set_tasks["not_run"]:
smiley = ":|"
reason = "there were tasks that were not granted run permission by the scheduler"
elif set_tasks["still_pending_ext"]:
smiley = ":|"
reason = "there were missing external dependencies"
Expand Down
18 changes: 16 additions & 2 deletions luigi/retcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ class retcode(luigi.Config):
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
scheduling_error = IntParameter(default=0,
description='''For when a task's complete() or requires() fails.'''
description='''For when a task's complete() or requires() fails,
or task-limit reached'''
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
not_run = IntParameter(default=0,
description="For when a task is not granted run permission by the scheduler."
)


def run_with_retcodes(argv):
Expand All @@ -76,6 +81,7 @@ def run_with_retcodes(argv):
sys.exit(retcodes.unhandled_exception)

task_sets = luigi.execution_summary._summary_dict(worker)
root_task = luigi.execution_summary._root_task(worker)
non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys()

def has(status):
Expand All @@ -87,5 +93,13 @@ def has(status):
(retcodes.task_failed, has('failed')),
(retcodes.already_running, has('run_by_other_worker')),
(retcodes.scheduling_error, has('scheduling_error')),
(retcodes.not_run, has('not_run')),
)
sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds))
expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds)

if expected_ret_code == 0 and \
root_task not in task_sets["completed"] and \
root_task not in task_sets["already_done"]:
sys.exit(retcodes.not_run)
else:
sys.exit(expected_ret_code)
100 changes: 51 additions & 49 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,70 +570,72 @@ def add(self, task, multiprocess=False):

def _add(self, task, is_complete):
if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit:
logger.warning('Will not schedule %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit)
return

formatted_traceback = None
try:
self._check_complete_value(is_complete)
except KeyboardInterrupt:
raise
except AsyncCompletionException as ex:
formatted_traceback = ex.trace
except BaseException:
formatted_traceback = traceback.format_exc()

if formatted_traceback is not None:
self.add_succeeded = False
self._log_complete_error(task, formatted_traceback)
task.trigger_event(Event.DEPENDENCY_MISSING, task)
self._email_complete_error(task, formatted_traceback)
logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit)
deps = None
status = UNKNOWN
runnable = False

elif is_complete:
deps = None
status = DONE
runnable = False

task.trigger_event(Event.DEPENDENCY_PRESENT, task)
elif _is_external(task):
deps = None
status = PENDING
runnable = worker().retry_external_tasks

task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data depedency, so it can not be run from'
' this luigi process.', task)

else:
formatted_traceback = None
try:
deps = task.deps()
except Exception as ex:
self._check_complete_value(is_complete)
except KeyboardInterrupt:
raise
except AsyncCompletionException as ex:
formatted_traceback = ex.trace
except BaseException:
formatted_traceback = traceback.format_exc()

if formatted_traceback is not None:
self.add_succeeded = False
self._log_dependency_error(task, formatted_traceback)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_dependency_error(task, formatted_traceback)
self._log_complete_error(task, formatted_traceback)
task.trigger_event(Event.DEPENDENCY_MISSING, task)
self._email_complete_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
else:

elif is_complete:
deps = None
status = DONE
runnable = False
task.trigger_event(Event.DEPENDENCY_PRESENT, task)

elif _is_external(task):
deps = None
status = PENDING
runnable = True
runnable = worker().retry_external_tasks
task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data depedency, so it can not be run from'
' this luigi process.', task)

else:
try:
deps = task.deps()
except Exception as ex:
formatted_traceback = traceback.format_exc()
self.add_succeeded = False
self._log_dependency_error(task, formatted_traceback)
task.trigger_event(Event.BROKEN_TASK, task, ex)
self._email_dependency_error(task, formatted_traceback)
deps = None
status = UNKNOWN
runnable = False
else:
status = PENDING
runnable = True

if task.disabled:
status = DISABLED
if task.disabled:
status = DISABLED

if deps:
for d in deps:
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add
if deps:
for d in deps:
self._validate_dependency(d)
task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d)
yield d # return additional tasks to add

deps = [d.task_id for d in deps]
deps = [d.task_id for d in deps]

self._scheduled_tasks[task.task_id] = task
self._add_task(worker=self._id, task_id=task.task_id, status=status,
Expand Down
Loading