Skip to content

Commit

Permalink
retcodes: Understand succesful retries (spotify#1951)
Browse files Browse the repository at this point in the history
Prior to this commit, the return codes from luigi didn't honor the case when a task once failed but then succeeded again in a following retry. This will both make the execution summary and return code correct. This fixes spotify#1932.
  • Loading branch information
bwtakacy authored and kreczko committed Mar 28, 2017
1 parent 3140d2b commit 4f9a076
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
24 changes: 15 additions & 9 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ def _partition_tasks(worker):
set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks}
set_tasks["already_done"] = {task for (task, status, ext) in task_history
if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]}
set_tasks["failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'}
set_tasks["ever_failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'}
set_tasks["failed"] = set_tasks["ever_failed"] - set_tasks["completed"]
set_tasks["scheduling_error"] = {task for(task, status, ext) in task_history if status == 'UNKNOWN'}
set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and not ext}
if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and not ext}
set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and ext}
if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and ext}
set_tasks["run_by_other_worker"] = set()
set_tasks["upstream_failure"] = set()
set_tasks["upstream_missing_dependency"] = set()
Expand Down Expand Up @@ -87,7 +88,7 @@ def _depth_first_search(set_tasks, current_task, visited):
for task in current_task._requires():
if task not in visited:
_depth_first_search(set_tasks, task, visited)
if task in set_tasks["failed"] or task in set_tasks["upstream_failure"]:
if task in set_tasks["ever_failed"] or task in set_tasks["upstream_failure"]:
set_tasks["upstream_failure"].add(current_task)
upstream_failure = True
if task in set_tasks["still_pending_ext"] or task in set_tasks["upstream_missing_dependency"]:
Expand Down Expand Up @@ -261,6 +262,7 @@ def _get_comments(group_tasks):
_ORDERED_STATUSES = (
"already_done",
"completed",
"ever_failed",
"failed",
"scheduling_error",
"still_pending",
Expand Down Expand Up @@ -377,11 +379,15 @@ def _summary_format(set_tasks, worker):
str_output += 'Did not run any tasks'
smiley = ""
reason = ""
if set_tasks["failed"]:
smiley = ":("
reason = "there were failed tasks"
if set_tasks["scheduling_error"]:
reason += " and tasks whose scheduling failed"
if set_tasks["ever_failed"]:
if not set_tasks["failed"]:
smiley = ":)"
reason = "there were failed tasks but they all suceeded in a retry"
else:
smiley = ":("
reason = "there were failed tasks"
if set_tasks["scheduling_error"]:
reason += " and tasks whose scheduling failed"
elif set_tasks["scheduling_error"]:
smiley = ":("
reason = "there were tasks whose scheduling failed"
Expand Down
31 changes: 31 additions & 0 deletions test/execution_summary_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,3 +1086,34 @@ class Bar(RunOnceTask):
self.assertIn('Luigi Execution Summary', s)
self.assertNotIn('00:00:00', s)
self.assertNotIn('\n\n\n', s)

"""
Test that a task once crashing and then succeeding should be counted as no failure.
"""
def test_status_with_task_retry(self):
class Foo(luigi.Task):
run_count = 0

def run(self):
self.run_count += 1
if self.run_count == 1:
raise ValueError()

def complete(self):
return self.run_count > 0

self.run_task(Foo())
self.run_task(Foo())
d = self.summary_dict()
self.assertEqual({Foo()}, d['completed'])
self.assertEqual({Foo()}, d['ever_failed'])
self.assertFalse(d['failed'])
self.assertFalse(d['upstream_failure'])
self.assertFalse(d['upstream_missing_dependency'])
self.assertFalse(d['run_by_other_worker'])
self.assertFalse(d['still_pending_ext'])
s = self.summary()
self.assertIn('Scheduled 1 task', s)
self.assertIn('Luigi Execution Summary', s)
self.assertNotIn('ever failed', s)
self.assertIn('\n\nThis progress looks :) because there were failed tasks but they all suceeded in a retry', s)
19 changes: 19 additions & 0 deletions test/retcodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,22 @@ def new_func(*args, **kwargs):
with mock.patch('luigi.scheduler.Scheduler.add_task', new_func):
self.run_and_expect('RequiringTask', 0)
self.run_and_expect('RequiringTask --retcode-not-run 5', 5)

"""
Test that a task once crashing and then succeeding should be counted as no failure.
"""
def test_retry_sucess_task(self):
class Foo(luigi.Task):
run_count = 0

def run(self):
self.run_count += 1
if self.run_count == 1:
raise ValueError()

def complete(self):
return self.run_count > 0

self.run_and_expect('Foo --scheduler-retry-delay=0', 0)
self.run_and_expect('Foo --scheduler-retry-delay=0 --retcode-task-failed=5', 0)
self.run_with_config(dict(task_failed='3'), 'Foo', 0)

0 comments on commit 4f9a076

Please sign in to comment.