Skip to content

Commit

Permalink
Fix error message in case of unfulfilled dependencies with single out…
Browse files Browse the repository at this point in the history
…put (spotify#3281)
  • Loading branch information
GianlucaFicarelli authored Apr 5, 2024
1 parent 25d179b commit 64d6c48
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
4 changes: 2 additions & 2 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, Config, DynamicRequirements
from luigi.task import Task, Config, DynamicRequirements, flatten
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
Expand Down Expand Up @@ -185,7 +185,7 @@ def run(self):
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
Expand Down
37 changes: 37 additions & 0 deletions test/worker_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import luigi
import luigi.date_interval
import luigi.notifications
from luigi.mock import MockTarget
from luigi.worker import TaskException, TaskProcess
from luigi.scheduler import DONE, FAILED

Expand Down Expand Up @@ -106,6 +107,42 @@ def complete(self):
None
))

def test_fail_on_unfulfilled_dependencies(self):
class NeverCompleteTask(luigi.Task):
def complete(self):
return False

class A(NeverCompleteTask):
def output(self):
return []

class B(NeverCompleteTask):
def output(self):
return MockTarget("foo-B")

class C(NeverCompleteTask):
def output(self):
return [MockTarget("foo-C1"), MockTarget("foo-C2")]

class Main(NeverCompleteTask):
def requires(self):
return [A(), B(), C()]

task = Main()
result_queue = multiprocessing.Queue()
task_process = TaskProcess(task, 1, result_queue, mock.Mock())

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
expected_missing = [A().task_id, f"{B().task_id} (foo-B)", f"{C().task_id} (foo-C1, foo-C2)"]
mock_put.assert_called_once_with((
task.task_id,
FAILED,
StringContaining(f"Unfulfilled dependencies at run time: {', '.join(expected_missing)}"),
expected_missing,
[],
))

def test_cleanup_children_on_terminate(self):
"""
Subprocesses spawned by tasks should be terminated on terminate
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deps =
pytest<7.0
pytest-cov>=2.0,<3.0
mock<2.0
moto>=1.3.10
moto>=1.3.10,<5.0
HTTPretty==0.8.10
docker>=2.1.0
boto>=2.42,<3.0
Expand Down

0 comments on commit 64d6c48

Please sign in to comment.