diff --git a/doc/configuration.rst b/doc/configuration.rst index bceac2a739..dbead798ce 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -479,28 +479,37 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file: # They are in increasing level of severity (for most applications) already_running=10 missing_data=20 + not_run=25 task_failed=30 scheduling_error=35 unhandled_exception=40 -unhandled_exception - For internal Luigi errors. Defaults to 4, since this type of error - probably will not recover over time. +already_running + This can happen in two different cases. Either the local lock file was taken + at the time the invocation starts up. Or, the central scheduler have reported + that some tasks could not have been run, because other workers are already + running the tasks. missing_data For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this caused the worker to give up. As an alternative to fiddling with this, see the [worker] keep_alive option. -scheduling_error - For when a task's ``complete()`` or ``requires()`` method fails with an - exception. +not_run + For when a task is not granted run permission by the scheduler. Typically + because of lack of resources, because the task has been already run by + another worker or because the attempted task is in DISABLED state. + Connectivity issues with the central scheduler might also cause this. + This does not include the cases for which a run is not allowed due to missing + dependencies (missing_data) or due to the fact that another worker is currently + running the task (already_running). task_failed For signaling that there were last known to have failed. Typically because some exception have been raised. -already_running - This can happen in two different cases. Either the local lock file was taken - at the time the invocation starts up. Or, the central scheduler have reported - that some tasks could not have been run, because other workers are already - running the tasks. +scheduling_error + For when a task's ``complete()`` or ``requires()`` method fails with an + exception, or when the limit number of tasks is reached. +unhandled_exception + For internal Luigi errors. Defaults to 4, since this type of error + probably will not recover over time. If you customize return codes, prefer to set them in range 128 to 255 to avoid conflicts. Return codes in range 0 to 127 are reserved for possible future use diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 800a0fd544..27efbcb66f 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -54,13 +54,20 @@ def _partition_tasks(worker): set_tasks["upstream_missing_dependency"] = set() set_tasks["upstream_run_by_other_worker"] = set() set_tasks["upstream_scheduling_error"] = set() - set_tasks["unknown_reason"] = set() + set_tasks["not_run"] = set() return set_tasks +def _root_task(worker): + """ + Return the first task scheduled by the worker, corresponding to the root task + """ + return worker._add_task_history[0][0] + + def _populate_unknown_statuses(set_tasks): """ - Add the "upstream_*" and "unknown_reason" statuses my mutating set_tasks. + Add the "upstream_*" and "not_run" statuses my mutating set_tasks. """ visited = set() for task in set_tasks["still_pending_not_ext"]: @@ -95,7 +102,7 @@ def _depth_first_search(set_tasks, current_task, visited): if not upstream_failure and not upstream_missing_dependency and \ not upstream_run_by_other_worker and not upstream_scheduling_error and \ current_task not in set_tasks["run_by_other_worker"]: - set_tasks["unknown_reason"].add(current_task) + set_tasks["not_run"].add(current_task) def _get_str(task_dict, extra_indent): @@ -263,22 +270,22 @@ def _get_comments(group_tasks): "upstream_missing_dependency", "upstream_run_by_other_worker", "upstream_scheduling_error", - "unknown_reason", + "not_run", ) _PENDING_SUB_STATUSES = set(_ORDERED_STATUSES[_ORDERED_STATUSES.index("still_pending_ext"):]) _COMMENTS = set(( ("already_done", 'present dependencies were encountered'), ("completed", 'ran successfully'), ("failed", 'failed'), - ("scheduling_error", 'failed running complete() or requires()'), + ("scheduling_error", 'failed scheduling'), ("still_pending", 'were left pending, among these'), ("still_pending_ext", 'were missing external dependencies'), ("run_by_other_worker", 'were being run by another worker'), ("upstream_failure", 'had failed dependencies'), ("upstream_missing_dependency", 'had missing external dependencies'), ("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'), - ("upstream_scheduling_error", 'had dependencies whose complete() or requires() failed'), - ("unknown_reason", 'were left pending because of unknown reason'), + ("upstream_scheduling_error", 'had dependencies whose scheduling failed'), + ("not_run", 'was not granted run permission by the scheduler'), )) @@ -374,10 +381,13 @@ def _summary_format(set_tasks, worker): smiley = ":(" reason = "there were failed tasks" if set_tasks["scheduling_error"]: - reason += " and tasks whose complete() or requires() failed" + reason += " and tasks whose scheduling failed" elif set_tasks["scheduling_error"]: smiley = ":(" - reason = "there were tasks whose complete() or requires() failed" + reason = "there were tasks whose scheduling failed" + elif set_tasks["not_run"]: + smiley = ":|" + reason = "there were tasks that were not granted run permission by the scheduler" elif set_tasks["still_pending_ext"]: smiley = ":|" reason = "there were missing external dependencies" diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 4d58e7de9e..1b9c778c41 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -48,8 +48,13 @@ class retcode(luigi.Config): ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons scheduling_error = IntParameter(default=0, - description='''For when a task's complete() or requires() fails.''' + description='''For when a task's complete() or requires() fails, + or task-limit reached''' ) + # default value inconsistent with doc/configuration.rst for backwards compatibility reasons + not_run = IntParameter(default=0, + description="For when a task is not granted run permission by the scheduler." + ) def run_with_retcodes(argv): @@ -76,6 +81,7 @@ def run_with_retcodes(argv): sys.exit(retcodes.unhandled_exception) task_sets = luigi.execution_summary._summary_dict(worker) + root_task = luigi.execution_summary._root_task(worker) non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys() def has(status): @@ -87,5 +93,13 @@ def has(status): (retcodes.task_failed, has('failed')), (retcodes.already_running, has('run_by_other_worker')), (retcodes.scheduling_error, has('scheduling_error')), + (retcodes.not_run, has('not_run')), ) - sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds)) + expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds) + + if expected_ret_code == 0 and \ + root_task not in task_sets["completed"] and \ + root_task not in task_sets["already_done"]: + sys.exit(retcodes.not_run) + else: + sys.exit(expected_ret_code) diff --git a/luigi/worker.py b/luigi/worker.py index 9d6f844b2a..025839445d 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -570,70 +570,72 @@ def add(self, task, multiprocess=False): def _add(self, task, is_complete): if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit: - logger.warning('Will not schedule %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit) - return - - formatted_traceback = None - try: - self._check_complete_value(is_complete) - except KeyboardInterrupt: - raise - except AsyncCompletionException as ex: - formatted_traceback = ex.trace - except BaseException: - formatted_traceback = traceback.format_exc() - - if formatted_traceback is not None: - self.add_succeeded = False - self._log_complete_error(task, formatted_traceback) - task.trigger_event(Event.DEPENDENCY_MISSING, task) - self._email_complete_error(task, formatted_traceback) + logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit) deps = None status = UNKNOWN runnable = False - elif is_complete: - deps = None - status = DONE - runnable = False - - task.trigger_event(Event.DEPENDENCY_PRESENT, task) - elif _is_external(task): - deps = None - status = PENDING - runnable = worker().retry_external_tasks - - task.trigger_event(Event.DEPENDENCY_MISSING, task) - logger.warning('Data for %s does not exist (yet?). The task is an ' - 'external data depedency, so it can not be run from' - ' this luigi process.', task) - else: + formatted_traceback = None try: - deps = task.deps() - except Exception as ex: + self._check_complete_value(is_complete) + except KeyboardInterrupt: + raise + except AsyncCompletionException as ex: + formatted_traceback = ex.trace + except BaseException: formatted_traceback = traceback.format_exc() + + if formatted_traceback is not None: self.add_succeeded = False - self._log_dependency_error(task, formatted_traceback) - task.trigger_event(Event.BROKEN_TASK, task, ex) - self._email_dependency_error(task, formatted_traceback) + self._log_complete_error(task, formatted_traceback) + task.trigger_event(Event.DEPENDENCY_MISSING, task) + self._email_complete_error(task, formatted_traceback) deps = None status = UNKNOWN runnable = False - else: + + elif is_complete: + deps = None + status = DONE + runnable = False + task.trigger_event(Event.DEPENDENCY_PRESENT, task) + + elif _is_external(task): + deps = None status = PENDING - runnable = True + runnable = worker().retry_external_tasks + task.trigger_event(Event.DEPENDENCY_MISSING, task) + logger.warning('Data for %s does not exist (yet?). The task is an ' + 'external data depedency, so it can not be run from' + ' this luigi process.', task) + + else: + try: + deps = task.deps() + except Exception as ex: + formatted_traceback = traceback.format_exc() + self.add_succeeded = False + self._log_dependency_error(task, formatted_traceback) + task.trigger_event(Event.BROKEN_TASK, task, ex) + self._email_dependency_error(task, formatted_traceback) + deps = None + status = UNKNOWN + runnable = False + else: + status = PENDING + runnable = True - if task.disabled: - status = DISABLED + if task.disabled: + status = DISABLED - if deps: - for d in deps: - self._validate_dependency(d) - task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) - yield d # return additional tasks to add + if deps: + for d in deps: + self._validate_dependency(d) + task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) + yield d # return additional tasks to add - deps = [d.task_id for d in deps] + deps = [d.task_id for d in deps] self._scheduled_tasks[task.task_id] = task self._add_task(worker=self._id, task_id=task.task_id, status=status, diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index 88cbfdcffa..ea4530804f 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -113,7 +113,7 @@ def requires(self): self.assertEqual({Foo()}, d['still_pending_not_ext']) self.assertEqual({Foo()}, d['upstream_scheduling_error']) self.assertEqual({Bar()}, d['scheduling_error']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['failed']) @@ -126,14 +126,62 @@ def requires(self): '===== Luigi Execution Summary =====', '', 'Scheduled 2 tasks of which:', - '* 1 failed running complete() or requires():', + '* 1 failed scheduling:', ' - 1 Bar()', '* 1 were left pending, among these:', - " * 1 had dependencies whose complete() or requires() failed:", + " * 1 had dependencies whose scheduling failed:", ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :( because there were tasks whose complete() or requires() failed', + 'This progress looks :( because there were tasks whose scheduling failed', + '', + '===== Luigi Execution Summary =====', + ''] + result = summary.split('\n') + self.assertEqual(len(result), len(expected)) + for i, line in enumerate(result): + self.assertEqual(line, expected[i]) + + def test_not_run_error(self): + class Bar(luigi.Task): + def complete(self): + return True + + class Foo(luigi.Task): + def requires(self): + yield Bar() + + def new_func(*args, **kwargs): + return None + + with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + self.run_task(Foo()) + + d = self.summary_dict() + self.assertEqual({Foo()}, d['still_pending_not_ext']) + self.assertEqual({Foo()}, d['not_run']) + self.assertEqual({Bar()}, d['already_done']) + self.assertFalse(d['upstream_scheduling_error']) + self.assertFalse(d['scheduling_error']) + self.assertFalse(d['completed']) + self.assertFalse(d['failed']) + self.assertFalse(d['upstream_failure']) + self.assertFalse(d['upstream_missing_dependency']) + self.assertFalse(d['run_by_other_worker']) + self.assertFalse(d['still_pending_ext']) + summary = self.summary() + expected = ['', + '===== Luigi Execution Summary =====', + '', + 'Scheduled 2 tasks of which:', + '* 1 present dependencies were encountered:', + ' - 1 Bar()', + '* 1 were left pending, among these:', + " * 1 was not granted run permission by the scheduler:", + ' - 1 Foo()', + '', + 'Did not run any tasks', + 'This progress looks :| because there were tasks that were not granted run permission by the scheduler', '', '===== Luigi Execution Summary =====', ''] @@ -159,7 +207,7 @@ def requires(self): d = self.summary_dict() self.assertEqual({Foo()}, d['scheduling_error']) self.assertFalse(d['upstream_scheduling_error']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['failed']) @@ -172,11 +220,11 @@ def requires(self): '===== Luigi Execution Summary =====', '', 'Scheduled 1 tasks of which:', - '* 1 failed running complete() or requires():', + '* 1 failed scheduling:', ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :( because there were tasks whose complete() or requires() failed', + 'This progress looks :( because there were tasks whose scheduling failed', '', '===== Luigi Execution Summary =====', ''] @@ -351,10 +399,10 @@ def new_func(*args, **kwargs): d = self.summary_dict() self.assertFalse(d['already_done']) self.assertFalse(d['completed']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertEqual({AlreadyRunningTask()}, d['run_by_other_worker']) - def test_unknown_reason(self): + def test_not_run(self): class AlreadyRunningTask(luigi.Task): def run(self): pass @@ -375,12 +423,12 @@ def new_func(*args, **kwargs): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({AlreadyRunningTask()}, d['unknown_reason']) + self.assertEqual({AlreadyRunningTask()}, d['not_run']) s = self.summary() self.assertIn('\nScheduled 1 tasks of which:\n' '* 1 were left pending, among these:\n' - ' * 1 were left pending because of unknown reason:\n' + ' * 1 was not granted run permission by the scheduler:\n' ' - 1 AlreadyRunningTask()\n', s) self.assertNotIn('\n\n\n', s) @@ -399,7 +447,7 @@ class SomeTask(RunOnceTask): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({SomeTask()}, d['unknown_reason']) + self.assertEqual({SomeTask()}, d['not_run']) def test_somebody_else_disables_task(self): class SomeTask(luigi.Task): @@ -420,7 +468,7 @@ def run(self): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({SomeTask()}, d['unknown_reason']) + self.assertEqual({SomeTask()}, d['not_run']) def test_larger_tree(self): diff --git a/test/retcodes_test.py b/test/retcodes_test.py index 7ce3ddc8a8..c16b67d147 100644 --- a/test/retcodes_test.py +++ b/test/retcodes_test.py @@ -106,6 +106,22 @@ def requires(self): self.run_and_expect('RequiringTask', 4) + def test_task_limit(self): + class TaskB(luigi.Task): + def complete(self): + return False + + class TaskA(luigi.Task): + def requires(sefl): + yield TaskB() + + class TaskLimitTest(luigi.Task): + def requires(self): + yield TaskA() + + self.run_and_expect('TaskLimitTest --worker-task-limit 2', 0) + self.run_and_expect('TaskLimitTest --worker-task-limit 2 --retcode-scheduling-error 3', 3) + def test_unhandled_exception(self): def new_func(*args, **kwargs): raise Exception() @@ -137,3 +153,20 @@ def requires(self): self.run_and_expect('RequiringTask --retcode-task-failed 4 --retcode-missing-data 5', 5) self.run_and_expect('RequiringTask --retcode-task-failed 7 --retcode-missing-data 6', 7) + + def test_unknown_reason(self): + + class TaskA(luigi.Task): + def complete(self): + return True + + class RequiringTask(luigi.Task): + def requires(self): + yield TaskA() + + def new_func(*args, **kwargs): + return None + + with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + self.run_and_expect('RequiringTask', 0) + self.run_and_expect('RequiringTask --retcode-not-run 5', 5)