Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for naming tasks in @requires #3077

Merged
merged 3 commits into from
Aug 28, 2021

Conversation

IanCal
Copy link
Contributor

@IanCal IanCal commented Apr 22, 2021

Description

This change lets a user name the tasks in the requires method like so:

@requires(daily=DailySummaryTask, weekly=WeeklySummaryTask)
class SummaryTask(Task):
    def run(self):
        with self.input()["daily"].open() as daily_results:
            ...

Motivation and Context

When we use the requires decorator, we can save a lot of time and hassle passing parameters around. One issue we have hit is that the run method only sees a list of targets, with no context as to which earlier task generated the results.

Luigi already supports named inputs, by returning a dictionary from the requires function. This change just allows passing in kwargs to the decorator and creates a requires method using those. I added this to the inherits decorator as well,

There wasn't a sensible option in my mind if you try and pass in both, so I made it throw an error in that case.

This doesn't include an update to the documentation but I can add some docs if the approach is OK.

This doesn't handle any fancier nested versions, just a simple dictionary.

Have you tested this? If so, how?

I added some basic unit tests, but can expand these, and have put an example program below.

It's largely taken from demo code we're working on, and is I hope a relatively simple change.

Example:

from luigi.util import requires
from luigi import Task, Parameter, build
from luigi.local_target import LocalTarget


class DependencyA(Task):
    def run(self):
        with self.output().open("w") as f_out:
            f_out.write("DependencyA file output")

    def output(self):
        return LocalTarget(f"output/{self.get_task_family()}")


class DependencyB(Task):
    def run(self):
        with self.output().open("w") as f_out:
            f_out.write("DependencyB file output")

    def output(self):
        return LocalTarget(f"output/{self.get_task_family()}")


@requires(dep_a=DependencyA, dep_b=DependencyB)
class ExampleTask(Task):
    def run(self):
        dep_a_data = self.input()["dep_a"].open().read()
        dep_b_data = self.input()["dep_b"].open().read()
        print("Data", dep_a_data, dep_b_data)

build([
    ExampleTask()
], local_scheduler=True)

When using @requires the requires method is auto-generated. However, as
it just takes a list the tasks run method needs to identify which input
is which.

This adds support for named requirements using luigis existing support
for returning a dictionary from the requires function.

Usage:

    class Parent1(luigi.Task):
        ...
    class Parent2(luigi.Task):
        ...

    @requires(first_parent=Parent1, second_parent=Parent2)
    class Child(luigi.Task):
        def run(self):
           first_parent_target = self.input()["first_parent"]
           second_parent_target = self.input()["second_parent"]
@IanCal IanCal requested review from dlstadther, Tarrasch and a team as code owners April 22, 2021 13:09
super(requires, self).__init__()
if not tasks_to_require:
raise TypeError("tasks_to_require cannot be empty")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error isn't required as inherits will be called, and inherits throws the same errors.

@dlstadther dlstadther merged commit 00aa83a into spotify:master Aug 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants