From bca9c507acbda88ff799a90937a30b7d75010ffe Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sat, 16 Jul 2016 19:40:02 +0200 Subject: [PATCH 01/10] Add retcode for tasks in state unknown_reason --- doc/configuration.rst | 3 +++ luigi/retcodes.py | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/doc/configuration.rst b/doc/configuration.rst index bceac2a739..3752851eb8 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -481,11 +481,14 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file: missing_data=20 task_failed=30 scheduling_error=35 + unknown_reason=38 unhandled_exception=40 unhandled_exception For internal Luigi errors. Defaults to 4, since this type of error probably will not recover over time. +unknown_reason + For when a task has not completed for unknown reasons. missing_data For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this caused the worker to give up. As an alternative to fiddling with this, see diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 4d58e7de9e..4c239195c8 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -50,6 +50,10 @@ class retcode(luigi.Config): scheduling_error = IntParameter(default=0, description='''For when a task's complete() or requires() fails.''' ) + # default value inconsistent with doc/configuration.rst for backwards compatibility reasons + unknown_reason = IntParameter(default=0, + description="For when a task fails or is left pending for unknown reasons." + ) def run_with_retcodes(argv): @@ -87,5 +91,6 @@ def has(status): (retcodes.task_failed, has('failed')), (retcodes.already_running, has('run_by_other_worker')), (retcodes.scheduling_error, has('scheduling_error')), + (retcodes.unknown_reason, has('unknown_reason')), ) sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds)) From 8ebb07343b0a4a4d33faaf7f905531459faa5436 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sun, 17 Jul 2016 14:15:44 +0200 Subject: [PATCH 02/10] Reaching task-limit counts as scheduling_error --- luigi/worker.py | 100 +++++++++++++++++++++--------------------- test/retcodes_test.py | 16 +++++++ 2 files changed, 67 insertions(+), 49 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index 9d6f844b2a..025839445d 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -570,70 +570,72 @@ def add(self, task, multiprocess=False): def _add(self, task, is_complete): if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit: - logger.warning('Will not schedule %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit) - return - - formatted_traceback = None - try: - self._check_complete_value(is_complete) - except KeyboardInterrupt: - raise - except AsyncCompletionException as ex: - formatted_traceback = ex.trace - except BaseException: - formatted_traceback = traceback.format_exc() - - if formatted_traceback is not None: - self.add_succeeded = False - self._log_complete_error(task, formatted_traceback) - task.trigger_event(Event.DEPENDENCY_MISSING, task) - self._email_complete_error(task, formatted_traceback) + logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit) deps = None status = UNKNOWN runnable = False - elif is_complete: - deps = None - status = DONE - runnable = False - - task.trigger_event(Event.DEPENDENCY_PRESENT, task) - elif _is_external(task): - deps = None - status = PENDING - runnable = worker().retry_external_tasks - - task.trigger_event(Event.DEPENDENCY_MISSING, task) - logger.warning('Data for %s does not exist (yet?). The task is an ' - 'external data depedency, so it can not be run from' - ' this luigi process.', task) - else: + formatted_traceback = None try: - deps = task.deps() - except Exception as ex: + self._check_complete_value(is_complete) + except KeyboardInterrupt: + raise + except AsyncCompletionException as ex: + formatted_traceback = ex.trace + except BaseException: formatted_traceback = traceback.format_exc() + + if formatted_traceback is not None: self.add_succeeded = False - self._log_dependency_error(task, formatted_traceback) - task.trigger_event(Event.BROKEN_TASK, task, ex) - self._email_dependency_error(task, formatted_traceback) + self._log_complete_error(task, formatted_traceback) + task.trigger_event(Event.DEPENDENCY_MISSING, task) + self._email_complete_error(task, formatted_traceback) deps = None status = UNKNOWN runnable = False - else: + + elif is_complete: + deps = None + status = DONE + runnable = False + task.trigger_event(Event.DEPENDENCY_PRESENT, task) + + elif _is_external(task): + deps = None status = PENDING - runnable = True + runnable = worker().retry_external_tasks + task.trigger_event(Event.DEPENDENCY_MISSING, task) + logger.warning('Data for %s does not exist (yet?). The task is an ' + 'external data depedency, so it can not be run from' + ' this luigi process.', task) + + else: + try: + deps = task.deps() + except Exception as ex: + formatted_traceback = traceback.format_exc() + self.add_succeeded = False + self._log_dependency_error(task, formatted_traceback) + task.trigger_event(Event.BROKEN_TASK, task, ex) + self._email_dependency_error(task, formatted_traceback) + deps = None + status = UNKNOWN + runnable = False + else: + status = PENDING + runnable = True - if task.disabled: - status = DISABLED + if task.disabled: + status = DISABLED - if deps: - for d in deps: - self._validate_dependency(d) - task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) - yield d # return additional tasks to add + if deps: + for d in deps: + self._validate_dependency(d) + task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) + yield d # return additional tasks to add - deps = [d.task_id for d in deps] + deps = [d.task_id for d in deps] self._scheduled_tasks[task.task_id] = task self._add_task(worker=self._id, task_id=task.task_id, status=status, diff --git a/test/retcodes_test.py b/test/retcodes_test.py index 7ce3ddc8a8..0f8f56916c 100644 --- a/test/retcodes_test.py +++ b/test/retcodes_test.py @@ -106,6 +106,22 @@ def requires(self): self.run_and_expect('RequiringTask', 4) + def test_task_limit(self): + class TaskB(luigi.Task): + def complete(self): + return False + + class TaskA(luigi.Task): + def requires(sefl): + yield TaskB() + + class TaskLimitTest(luigi.Task): + def requires(self): + yield TaskA() + + self.run_and_expect('TaskLimitTest --worker-task-limit 2', 0) + self.run_and_expect('TaskLimitTest --worker-task-limit 2 --retcode-scheduling-error 3', 3) + def test_unhandled_exception(self): def new_func(*args, **kwargs): raise Exception() From dc1c7c9ff2ad82f4226aa12c25031fd46fc0deac Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sun, 17 Jul 2016 14:59:31 +0200 Subject: [PATCH 03/10] Add test for unknown_reason exit code --- test/retcodes_test.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/retcodes_test.py b/test/retcodes_test.py index 0f8f56916c..63c469039d 100644 --- a/test/retcodes_test.py +++ b/test/retcodes_test.py @@ -153,3 +153,20 @@ def requires(self): self.run_and_expect('RequiringTask --retcode-task-failed 4 --retcode-missing-data 5', 5) self.run_and_expect('RequiringTask --retcode-task-failed 7 --retcode-missing-data 6', 7) + + def test_unknown_reason(self): + + class TaskA(luigi.Task): + def complete(self): + return True + + class RequiringTask(luigi.Task): + def requires(self): + yield TaskA() + + def new_func(*args, **kwargs): + return None + + with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + self.run_and_expect('RequiringTask', 0) + self.run_and_expect('RequiringTask --retcode-unknown-reason 5', 5) From fa906a0e522595a820add2a60a2941aef0581f50 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sun, 17 Jul 2016 15:05:43 +0200 Subject: [PATCH 04/10] Update execution summary for unknwon_reason case --- luigi/execution_summary.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 800a0fd544..26b45cf98e 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -378,6 +378,9 @@ def _summary_format(set_tasks, worker): elif set_tasks["scheduling_error"]: smiley = ":(" reason = "there were tasks whose complete() or requires() failed" + elif set_tasks["unknown_reason"]: + smiley = ":(" + reason = "there were tasks that failed or were left pending for unknown reason" elif set_tasks["still_pending_ext"]: smiley = ":|" reason = "there were missing external dependencies" From c0d4b9e301c8e0830a6bc943541a8224ec6e831e Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sun, 17 Jul 2016 15:17:49 +0200 Subject: [PATCH 05/10] Add test for unknown_reason execution summary --- test/execution_summary_test.py | 48 ++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index 88cbfdcffa..e5fa606902 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -142,6 +142,54 @@ def requires(self): for i, line in enumerate(result): self.assertEqual(line, expected[i]) + def test_unknown_reason_error(self): + class Bar(luigi.Task): + def complete(self): + return True + + class Foo(luigi.Task): + def requires(self): + yield Bar() + + def new_func(*args, **kwargs): + return None + + with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + self.run_task(Foo()) + + d = self.summary_dict() + self.assertEqual({Foo()}, d['still_pending_not_ext']) + self.assertEqual({Foo()}, d['unknown_reason']) + self.assertEqual({Bar()}, d['already_done']) + self.assertFalse(d['upstream_scheduling_error']) + self.assertFalse(d['scheduling_error']) + self.assertFalse(d['completed']) + self.assertFalse(d['failed']) + self.assertFalse(d['upstream_failure']) + self.assertFalse(d['upstream_missing_dependency']) + self.assertFalse(d['run_by_other_worker']) + self.assertFalse(d['still_pending_ext']) + summary = self.summary() + expected = ['', + '===== Luigi Execution Summary =====', + '', + 'Scheduled 2 tasks of which:', + '* 1 present dependencies were encountered:', + ' - 1 Bar()', + '* 1 were left pending, among these:', + " * 1 were left pending because of unknown reason:", + ' - 1 Foo()', + '', + 'Did not run any tasks', + 'This progress looks :( because there were tasks that failed or were left pending for unknown reason', + '', + '===== Luigi Execution Summary =====', + ''] + result = summary.split('\n') + self.assertEqual(len(result), len(expected)) + for i, line in enumerate(result): + self.assertEqual(line, expected[i]) + def test_deps_error(self): class Bar(luigi.Task): def run(self): From 19e934a9b872b2ca5377d0efdbaaaa0b736f72aa Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Sun, 17 Jul 2016 15:20:09 +0200 Subject: [PATCH 06/10] Update unknown_reason error description in docs --- doc/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index 3752851eb8..f731f62525 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -488,7 +488,7 @@ unhandled_exception For internal Luigi errors. Defaults to 4, since this type of error probably will not recover over time. unknown_reason - For when a task has not completed for unknown reasons. + For when a task fails or is left pending for unknown reasons. missing_data For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this caused the worker to give up. As an alternative to fiddling with this, see From e08185e6bf55a0d2ed26df07cb8e807c06858ae8 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Mon, 18 Jul 2016 10:02:34 +0200 Subject: [PATCH 07/10] Update scheduling error messages/documentation --- doc/configuration.rst | 2 +- luigi/execution_summary.py | 8 ++++---- luigi/retcodes.py | 3 ++- test/execution_summary_test.py | 10 +++++----- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index f731f62525..074426525d 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -495,7 +495,7 @@ missing_data the [worker] keep_alive option. scheduling_error For when a task's ``complete()`` or ``requires()`` method fails with an - exception. + exception, or when the limit number of tasks is reached. task_failed For signaling that there were last known to have failed. Typically because some exception have been raised. diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 26b45cf98e..8caf5c562e 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -270,14 +270,14 @@ def _get_comments(group_tasks): ("already_done", 'present dependencies were encountered'), ("completed", 'ran successfully'), ("failed", 'failed'), - ("scheduling_error", 'failed running complete() or requires()'), + ("scheduling_error", 'failed scheduling'), ("still_pending", 'were left pending, among these'), ("still_pending_ext", 'were missing external dependencies'), ("run_by_other_worker", 'were being run by another worker'), ("upstream_failure", 'had failed dependencies'), ("upstream_missing_dependency", 'had missing external dependencies'), ("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'), - ("upstream_scheduling_error", 'had dependencies whose complete() or requires() failed'), + ("upstream_scheduling_error", 'had dependencies whose scheduling failed'), ("unknown_reason", 'were left pending because of unknown reason'), )) @@ -374,10 +374,10 @@ def _summary_format(set_tasks, worker): smiley = ":(" reason = "there were failed tasks" if set_tasks["scheduling_error"]: - reason += " and tasks whose complete() or requires() failed" + reason += " and tasks whose scheduling failed" elif set_tasks["scheduling_error"]: smiley = ":(" - reason = "there were tasks whose complete() or requires() failed" + reason = "there were tasks whose scheduling failed" elif set_tasks["unknown_reason"]: smiley = ":(" reason = "there were tasks that failed or were left pending for unknown reason" diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 4c239195c8..3eaae6ebc2 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -48,7 +48,8 @@ class retcode(luigi.Config): ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons scheduling_error = IntParameter(default=0, - description='''For when a task's complete() or requires() fails.''' + description='''For when a task's complete() or requires() fails, + or task-limit reached''' ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons unknown_reason = IntParameter(default=0, diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index e5fa606902..fc2ce60bcf 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -126,14 +126,14 @@ def requires(self): '===== Luigi Execution Summary =====', '', 'Scheduled 2 tasks of which:', - '* 1 failed running complete() or requires():', + '* 1 failed scheduling:', ' - 1 Bar()', '* 1 were left pending, among these:', - " * 1 had dependencies whose complete() or requires() failed:", + " * 1 had dependencies whose scheduling failed:", ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :( because there were tasks whose complete() or requires() failed', + 'This progress looks :( because there were tasks whose scheduling failed', '', '===== Luigi Execution Summary =====', ''] @@ -220,11 +220,11 @@ def requires(self): '===== Luigi Execution Summary =====', '', 'Scheduled 1 tasks of which:', - '* 1 failed running complete() or requires():', + '* 1 failed scheduling:', ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :( because there were tasks whose complete() or requires() failed', + 'This progress looks :( because there were tasks whose scheduling failed', '', '===== Luigi Execution Summary =====', ''] From 54ca4a358e671691e0bbbb7422438ed90512383c Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Mon, 18 Jul 2016 11:37:21 +0200 Subject: [PATCH 08/10] Update docs/logs for unknown_reason --- doc/configuration.rst | 7 +++++-- luigi/execution_summary.py | 6 +++--- luigi/retcodes.py | 2 +- test/execution_summary_test.py | 6 +++--- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index 074426525d..862af38cca 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -481,14 +481,17 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file: missing_data=20 task_failed=30 scheduling_error=35 - unknown_reason=38 + unknown_reason=38 or 0 unhandled_exception=40 unhandled_exception For internal Luigi errors. Defaults to 4, since this type of error probably will not recover over time. unknown_reason - For when a task fails or is left pending for unknown reasons. + For when a task does not run successfully because of an unknown reason. Despite + this case can be expected in an error free execution, it does not guarantee + completeness of the root task. Return code 38 is advised only for cases in which + Luigi return code 0 is used by the user to guarantee root tasks' completeness. missing_data For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this caused the worker to give up. As an alternative to fiddling with this, see diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 8caf5c562e..67c5f8868b 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -278,7 +278,7 @@ def _get_comments(group_tasks): ("upstream_missing_dependency", 'had missing external dependencies'), ("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'), ("upstream_scheduling_error", 'had dependencies whose scheduling failed'), - ("unknown_reason", 'were left pending because of unknown reason'), + ("unknown_reason", 'did not run successfully because of unknown reason'), )) @@ -379,8 +379,8 @@ def _summary_format(set_tasks, worker): smiley = ":(" reason = "there were tasks whose scheduling failed" elif set_tasks["unknown_reason"]: - smiley = ":(" - reason = "there were tasks that failed or were left pending for unknown reason" + smiley = ":|" + reason = "there were tasks that did not run successfully because of unknown reason" elif set_tasks["still_pending_ext"]: smiley = ":|" reason = "there were missing external dependencies" diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 3eaae6ebc2..4698db7ff8 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -53,7 +53,7 @@ class retcode(luigi.Config): ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons unknown_reason = IntParameter(default=0, - description="For when a task fails or is left pending for unknown reasons." + description="For when a task does not run successfully because of unknown reason." ) diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index fc2ce60bcf..22d1781057 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -177,11 +177,11 @@ def new_func(*args, **kwargs): '* 1 present dependencies were encountered:', ' - 1 Bar()', '* 1 were left pending, among these:', - " * 1 were left pending because of unknown reason:", + " * 1 did not run successfully because of unknown reason:", ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :( because there were tasks that failed or were left pending for unknown reason', + 'This progress looks :| because there were tasks that did not run successfully because of unknown reason', '', '===== Luigi Execution Summary =====', ''] @@ -428,7 +428,7 @@ def new_func(*args, **kwargs): s = self.summary() self.assertIn('\nScheduled 1 tasks of which:\n' '* 1 were left pending, among these:\n' - ' * 1 were left pending because of unknown reason:\n' + ' * 1 did not run successfully because of unknown reason:\n' ' - 1 AlreadyRunningTask()\n', s) self.assertNotIn('\n\n\n', s) From dde2a24097cd1ebbb160d0c08c4142547d9354c5 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Mon, 18 Jul 2016 14:16:07 +0200 Subject: [PATCH 09/10] Return code 0 strictly corresponds to success --- luigi/execution_summary.py | 7 +++++++ luigi/retcodes.py | 10 +++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 67c5f8868b..5bee57ffcc 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -58,6 +58,13 @@ def _partition_tasks(worker): return set_tasks +def _root_task(worker): + """ + Return the first task scheduled by the worker, corresponding to the root task + """ + return worker._add_task_history[0][0] + + def _populate_unknown_statuses(set_tasks): """ Add the "upstream_*" and "unknown_reason" statuses my mutating set_tasks. diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 4698db7ff8..9117c1ce4a 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -81,6 +81,7 @@ def run_with_retcodes(argv): sys.exit(retcodes.unhandled_exception) task_sets = luigi.execution_summary._summary_dict(worker) + root_task = luigi.execution_summary._root_task(worker) non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys() def has(status): @@ -94,4 +95,11 @@ def has(status): (retcodes.scheduling_error, has('scheduling_error')), (retcodes.unknown_reason, has('unknown_reason')), ) - sys.exit(max(code * (1 if cond else 0) for code, cond in codes_and_conds)) + expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds) + + if expected_ret_code == 0 and \ + root_task not in task_sets["completed"] and \ + root_task not in task_sets["already_done"]: + sys.exit(retcodes.unknown_reason) + else: + sys.exit(expected_ret_code) From df526af08ccacfdf265801bd831bc694af3e0f3b Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Tue, 19 Jul 2016 14:35:18 +0200 Subject: [PATCH 10/10] Change unknown_reason into not_run --- doc/configuration.rst | 37 ++++++++++++++++++---------------- luigi/execution_summary.py | 14 ++++++------- luigi/retcodes.py | 10 ++++----- test/execution_summary_test.py | 24 +++++++++++----------- test/retcodes_test.py | 2 +- 5 files changed, 45 insertions(+), 42 deletions(-) diff --git a/doc/configuration.rst b/doc/configuration.rst index 862af38cca..dbead798ce 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -479,34 +479,37 @@ We recommend that you copy this set of exit codes to your ``luigi.cfg`` file: # They are in increasing level of severity (for most applications) already_running=10 missing_data=20 + not_run=25 task_failed=30 scheduling_error=35 - unknown_reason=38 or 0 unhandled_exception=40 -unhandled_exception - For internal Luigi errors. Defaults to 4, since this type of error - probably will not recover over time. -unknown_reason - For when a task does not run successfully because of an unknown reason. Despite - this case can be expected in an error free execution, it does not guarantee - completeness of the root task. Return code 38 is advised only for cases in which - Luigi return code 0 is used by the user to guarantee root tasks' completeness. +already_running + This can happen in two different cases. Either the local lock file was taken + at the time the invocation starts up. Or, the central scheduler have reported + that some tasks could not have been run, because other workers are already + running the tasks. missing_data For when an :py:class:`~luigi.task.ExternalTask` is not complete, and this caused the worker to give up. As an alternative to fiddling with this, see the [worker] keep_alive option. -scheduling_error - For when a task's ``complete()`` or ``requires()`` method fails with an - exception, or when the limit number of tasks is reached. +not_run + For when a task is not granted run permission by the scheduler. Typically + because of lack of resources, because the task has been already run by + another worker or because the attempted task is in DISABLED state. + Connectivity issues with the central scheduler might also cause this. + This does not include the cases for which a run is not allowed due to missing + dependencies (missing_data) or due to the fact that another worker is currently + running the task (already_running). task_failed For signaling that there were last known to have failed. Typically because some exception have been raised. -already_running - This can happen in two different cases. Either the local lock file was taken - at the time the invocation starts up. Or, the central scheduler have reported - that some tasks could not have been run, because other workers are already - running the tasks. +scheduling_error + For when a task's ``complete()`` or ``requires()`` method fails with an + exception, or when the limit number of tasks is reached. +unhandled_exception + For internal Luigi errors. Defaults to 4, since this type of error + probably will not recover over time. If you customize return codes, prefer to set them in range 128 to 255 to avoid conflicts. Return codes in range 0 to 127 are reserved for possible future use diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 5bee57ffcc..27efbcb66f 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -54,7 +54,7 @@ def _partition_tasks(worker): set_tasks["upstream_missing_dependency"] = set() set_tasks["upstream_run_by_other_worker"] = set() set_tasks["upstream_scheduling_error"] = set() - set_tasks["unknown_reason"] = set() + set_tasks["not_run"] = set() return set_tasks @@ -67,7 +67,7 @@ def _root_task(worker): def _populate_unknown_statuses(set_tasks): """ - Add the "upstream_*" and "unknown_reason" statuses my mutating set_tasks. + Add the "upstream_*" and "not_run" statuses my mutating set_tasks. """ visited = set() for task in set_tasks["still_pending_not_ext"]: @@ -102,7 +102,7 @@ def _depth_first_search(set_tasks, current_task, visited): if not upstream_failure and not upstream_missing_dependency and \ not upstream_run_by_other_worker and not upstream_scheduling_error and \ current_task not in set_tasks["run_by_other_worker"]: - set_tasks["unknown_reason"].add(current_task) + set_tasks["not_run"].add(current_task) def _get_str(task_dict, extra_indent): @@ -270,7 +270,7 @@ def _get_comments(group_tasks): "upstream_missing_dependency", "upstream_run_by_other_worker", "upstream_scheduling_error", - "unknown_reason", + "not_run", ) _PENDING_SUB_STATUSES = set(_ORDERED_STATUSES[_ORDERED_STATUSES.index("still_pending_ext"):]) _COMMENTS = set(( @@ -285,7 +285,7 @@ def _get_comments(group_tasks): ("upstream_missing_dependency", 'had missing external dependencies'), ("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'), ("upstream_scheduling_error", 'had dependencies whose scheduling failed'), - ("unknown_reason", 'did not run successfully because of unknown reason'), + ("not_run", 'was not granted run permission by the scheduler'), )) @@ -385,9 +385,9 @@ def _summary_format(set_tasks, worker): elif set_tasks["scheduling_error"]: smiley = ":(" reason = "there were tasks whose scheduling failed" - elif set_tasks["unknown_reason"]: + elif set_tasks["not_run"]: smiley = ":|" - reason = "there were tasks that did not run successfully because of unknown reason" + reason = "there were tasks that were not granted run permission by the scheduler" elif set_tasks["still_pending_ext"]: smiley = ":|" reason = "there were missing external dependencies" diff --git a/luigi/retcodes.py b/luigi/retcodes.py index 9117c1ce4a..1b9c778c41 100644 --- a/luigi/retcodes.py +++ b/luigi/retcodes.py @@ -52,9 +52,9 @@ class retcode(luigi.Config): or task-limit reached''' ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons - unknown_reason = IntParameter(default=0, - description="For when a task does not run successfully because of unknown reason." - ) + not_run = IntParameter(default=0, + description="For when a task is not granted run permission by the scheduler." + ) def run_with_retcodes(argv): @@ -93,13 +93,13 @@ def has(status): (retcodes.task_failed, has('failed')), (retcodes.already_running, has('run_by_other_worker')), (retcodes.scheduling_error, has('scheduling_error')), - (retcodes.unknown_reason, has('unknown_reason')), + (retcodes.not_run, has('not_run')), ) expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds) if expected_ret_code == 0 and \ root_task not in task_sets["completed"] and \ root_task not in task_sets["already_done"]: - sys.exit(retcodes.unknown_reason) + sys.exit(retcodes.not_run) else: sys.exit(expected_ret_code) diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index 22d1781057..ea4530804f 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -113,7 +113,7 @@ def requires(self): self.assertEqual({Foo()}, d['still_pending_not_ext']) self.assertEqual({Foo()}, d['upstream_scheduling_error']) self.assertEqual({Bar()}, d['scheduling_error']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['failed']) @@ -142,7 +142,7 @@ def requires(self): for i, line in enumerate(result): self.assertEqual(line, expected[i]) - def test_unknown_reason_error(self): + def test_not_run_error(self): class Bar(luigi.Task): def complete(self): return True @@ -159,7 +159,7 @@ def new_func(*args, **kwargs): d = self.summary_dict() self.assertEqual({Foo()}, d['still_pending_not_ext']) - self.assertEqual({Foo()}, d['unknown_reason']) + self.assertEqual({Foo()}, d['not_run']) self.assertEqual({Bar()}, d['already_done']) self.assertFalse(d['upstream_scheduling_error']) self.assertFalse(d['scheduling_error']) @@ -177,11 +177,11 @@ def new_func(*args, **kwargs): '* 1 present dependencies were encountered:', ' - 1 Bar()', '* 1 were left pending, among these:', - " * 1 did not run successfully because of unknown reason:", + " * 1 was not granted run permission by the scheduler:", ' - 1 Foo()', '', 'Did not run any tasks', - 'This progress looks :| because there were tasks that did not run successfully because of unknown reason', + 'This progress looks :| because there were tasks that were not granted run permission by the scheduler', '', '===== Luigi Execution Summary =====', ''] @@ -207,7 +207,7 @@ def requires(self): d = self.summary_dict() self.assertEqual({Foo()}, d['scheduling_error']) self.assertFalse(d['upstream_scheduling_error']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['failed']) @@ -399,10 +399,10 @@ def new_func(*args, **kwargs): d = self.summary_dict() self.assertFalse(d['already_done']) self.assertFalse(d['completed']) - self.assertFalse(d['unknown_reason']) + self.assertFalse(d['not_run']) self.assertEqual({AlreadyRunningTask()}, d['run_by_other_worker']) - def test_unknown_reason(self): + def test_not_run(self): class AlreadyRunningTask(luigi.Task): def run(self): pass @@ -423,12 +423,12 @@ def new_func(*args, **kwargs): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({AlreadyRunningTask()}, d['unknown_reason']) + self.assertEqual({AlreadyRunningTask()}, d['not_run']) s = self.summary() self.assertIn('\nScheduled 1 tasks of which:\n' '* 1 were left pending, among these:\n' - ' * 1 did not run successfully because of unknown reason:\n' + ' * 1 was not granted run permission by the scheduler:\n' ' - 1 AlreadyRunningTask()\n', s) self.assertNotIn('\n\n\n', s) @@ -447,7 +447,7 @@ class SomeTask(RunOnceTask): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({SomeTask()}, d['unknown_reason']) + self.assertEqual({SomeTask()}, d['not_run']) def test_somebody_else_disables_task(self): class SomeTask(luigi.Task): @@ -468,7 +468,7 @@ def run(self): self.assertFalse(d['already_done']) self.assertFalse(d['completed']) self.assertFalse(d['run_by_other_worker']) - self.assertEqual({SomeTask()}, d['unknown_reason']) + self.assertEqual({SomeTask()}, d['not_run']) def test_larger_tree(self): diff --git a/test/retcodes_test.py b/test/retcodes_test.py index 63c469039d..c16b67d147 100644 --- a/test/retcodes_test.py +++ b/test/retcodes_test.py @@ -169,4 +169,4 @@ def new_func(*args, **kwargs): with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): self.run_and_expect('RequiringTask', 0) - self.run_and_expect('RequiringTask --retcode-unknown-reason 5', 5) + self.run_and_expect('RequiringTask --retcode-not-run 5', 5)