-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enables running of multiple tasks in batches #1538
Conversation
5bc6797
to
bcde8bb
Compare
Wow this is a lot of code. Thanks! Can you provide a concrete example of why this is useful? Trying to wrap my head around what problem this is addressing |
We have 3 concrete examples where it's useful
|
Ok, that's cool. Seems useful. Let me review in the next few days |
Sorry for the delay. Have been busy at work. Will try to allocate some time for this |
f5398a1
to
57870b3
Compare
No worries, we're all busy and I don't expect you to drop everything for such a big review. |
Cool. I'll try to review this by the end of tomorrow or the day after (which should be your morning ^^) |
Thanks! |
@@ -61,6 +67,19 @@ class Task2(luigi.Task): | |||
Register._default_namespace = namespace | |||
|
|||
|
|||
def task_id_str(task_family, params): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this either be made private (start with _
) or be given a docstring?
This refactoring could happen in a seperate PR to make this PR smaller. (If you have the time to refactor that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want it private because the whole point is to expose it to the scheduler so it can apply correct ids to generated tasks. I'll add a docstring.
Sorry I didn't have time for the full review yet. You don't need to care about my small nitpick comments for now. I'm a bit sceptical to how the API looks like though. Maybe I just need to reread it, give me some more time :) |
Thanks for looking. Definitely take your time if it can help us get a better API. I'll look into splitting off a smaller PR today. |
Thanks for splitting up your patch a bit and thanks for this patch. I recall Spotify wanted exactly this too (at least the max aggreation) and many other will find it useful too. Even VNG I think. But for this to be useful the API must be easy to use. In this post I'll focus on grinding on the API. So I am putting on my critiquing hat for a moment. Because I find the API
Quite simple. Now I want to batchify this. I might have got this wrong, but batching is quite nontrivial:
It feels like the API could be greatly simplified. Could something as simple as
Indeed. In this version. You're kind of overwriting your own code. You'll always receive an iterable in the Let me know what you think. Once I understand the API and we're on the same page on that. I promise to continue to review the implementation. :) |
This looks like a huge improvement to me. I guess this really needed some fresh eyes! Let me check that it makes sense with all of my existing batch jobs and I'll get back to you tomorrow. |
Ah, I'm so happy we seem to understand each other right away. Take your time on this. :) Another thing that is a bit unclear to me is how the case of multiple batchings are grouped. The two ways that come to mind are either multiplicative or additive. For example. If you have a task with two batching parameters X and Y, and the scheduler contains 3 tasks with the parameter tuples But if they were to be of any of the types I wonder, are we actually missing out on covering any use cases if we remove |
I think that should be left up to the person implementing the class. If you're going to ask for a range of one thing and a multiple of another thing, it's up to you to figure out what that means. If we just group Sticking with just the list type would probably work ok, but it'll make some things more annoying to implement. Grouping dozens or hundreds of tasks together will make the visualizer unusable with huge task names and possibly actually break some components, so I'd rather just avoid using the |
I'm not sure I understand this. What does it mean to group everything and send it back?
Isn't the implementation relatively simple if we only allow MULTIPLE? I think we can get rid of the metatask and BATCH_RUNNING status. Just imagine that |
So if I ask for param1 to be multiple and param2 to be a range, you'd simply turn (a, 1), (b, 2), (c, 3) into ([a,b,c], 1-3). It would be up to the person who asked for this weird mix to figure out how to deal with that.
I don't think that requiring MULTIPLE makes this any easier. The scheduler would still need to know which N tasks are being run so it doesn't keep scheduling them but can also recognize that they're all done when the batch task is done. So I think we'd still need BATCH_RUNNING. I also just realized that your suggestion would essentially bring back is_list to Parameter, as we're clearly now parsing comma-separated collections of IntParameters and getting a list in the task. I was trying to avoid this by mapping parameters to matching parameters in another class. I'll try to figure out how we can keep some of the simplicity of your suggestion without bringing back is_list. |
Oh ok. It wasn't clear to me that is what would be returned. But I think with nice docs it'll be clear.
I think I see what you mean.
Yea, I just shiver on the thought of bringing is_list back. Though I'm pretty sure this is different and in a healthy way. Sure, inside the |
is this a similar use case to #570 ? we closed it, but it seems somewhat simpler than the batching |
how about solving #570 and this PR more generally by having a new method on the task class that lets you specify how to merge tasks? class Task(luigi.Task):
def should_be_merged(self, all_tasks_of_this_class):
# do some merging operation
return merged_tasks |
The scheduler doesn't have access to task methods, so it wouldn't be able On Fri, Mar 4, 2016 at 5:50 AM, Erik Bernhardsson notifications@github.com
|
I think the batching is a lot more general than #570. #570 only covered the On Fri, Mar 4, 2016 at 7:49 AM, Dave Buchfuhrer dbuchfuhrer@gmail.com
|
self.test_aggregation('max', '3') | ||
|
||
def test_aggregation_range(self): | ||
self.test_aggregation('range', '1-3') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use types and not force the user to parse strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is even more important for csv
, I suppose I want csv
to be renamed to list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name range
will confuse people. Either call it consesecutive_range
or inconsecutive_range
, as you can see. After one sweep of this huge patch I still don't know if the range is consecutive or not. So the users will sure be confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea would be to not use strings as types. Why not have some enum like luigi.parameter.Aggregations.MAX
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use an enum.
I also removed range, as I just included it because it was simple and seemed useful for date ranges but wouldn't actually be easy to use directly in that sort of case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on removing range
, lets keep it simple in the beginning :)
@@ -44,6 +45,25 @@ | |||
_no_value = object() | |||
|
|||
|
|||
def join_with_commas(values): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this isn't public API prepend an _
underscore. _join_with_commas
As for mixing IntParameter with COMMA_LIST. Either make it a list (strongly preferred) or at least make it "crash early". |
As for if COMMA_LIST can be directly converted to a list for the user ... I suppose you're right that it's not trivial to make the parameter automatically be as a list in the In particular we want as little magic as possible, and we should keep to (only) support these syntaxes of passing a parameter:
and
Though it would be a poor user experience if the user (1) wouldn't get the value as a list and (2) we couldn't support anything except for In fact with your current implementation, What do you think? I'll try to think up of some way to implement this while introducing a bit less shortcomings. |
Here's roughly what I'm proposing for handling a comma list:
Now you can do For properly handling ints, we could convert strings to lists of alternating strings and integers. |
Sorry for not keeping being satisfied with this. But here's an idea that maybe could make implementation and API simpler. What about of instead of doing:
I just thought if we could do something like this:
Basically, I'm seeking out to know if we really need the server to care about if it's min/max/list. And let all aggregation be simple user-defined logic using already existing python stuff. |
Putting the aggregation on the user side would actually be fairly difficult for me. Consider the work needed to convert a job that does daily overwrites to use max batching. Under my scheme, you just add It's also less than ideal in the scheduler, as you're getting a much longer than necessary job description. If you have an hourly job that's behind a week, we could be talking about hundreds of parameters vs just one. I'm not sure whether this will break anything as the description of the job gets too large. In cases where you actually need the full list, you'll probably want to add a batch size limit which will prevent things from getting out of control. With a max parameter, you definitely don't want that. Most weird cases should be handled in the way you describe, but I think having MAX and LIST is a good compromise between the complexity added to the code base usability. |
I see, I guess the problem here is that there's to much magic. It's not clear if And the same time I'm not a big fan of your current implementation either. We're just lucky that comparing serialized dates are equivalent to comparing the date-objects themselves. I suppose you're eager to get this patch in, but I'm still hesitant as pulling this in complicates luigi a lot and perhaps there's some easier solution we've overlooked. What do you think? |
my_task.date will always be a date object unless you turn it into a list parameter with the provided decorator. I think by design most serialized objects that you'd want to do max on will work with string comparisons. I don't want to merge this until you're convinced it's good. So far your comments have improved it a lot. I've been using variants of this in my own fork for over a year, so there's no particular urgency to get it in right now. I'd just like to do it eventually so I don't have to deal with constant merge issues anymore. We might be able to add a client-side layer to do arbitrary batch methods from the list given to the server. We'll have to store a map containing the task ids the server is using for the batch jobs, but I think it could work. What do you think of that idea? |
Sometimes it's more efficient to run a group of tasks all at once rather than one at a time. With luigi, it's difficult to take advantage of this because your batch size will also be the minimum granularity you're able to compute. So if you have a job that runs hourly, you can't combine their computation when many of them get backlogged. When you have a task that runs daily, you can't get hourly runs. For the user, batching is accomplished by setting batch_method in the parameters that you wish to batch. This takes a list of parameter values (already parsed) and combines them into a single parameter. You can limit the number of tasks allowed to run simultaneously in a single batch by setting the class variable batch_size. If you want to exempt a specific task from being part of a batch, simply set its is_batchable property to False. Unlike spotify#1538, this does batch combining on the worker side, allowing arbitrary batch methods and keeping better track of task ids for things like execution summary.
Sometimes it's more efficient to run a group of tasks all at once rather than one at a time. With luigi, it's difficult to take advantage of this because your batch size will also be the minimum granularity you're able to compute. So if you have a job that runs hourly, you can't combine their computation when many of them get backlogged. When you have a task that runs daily, you can't get hourly runs. For the user, batching is accomplished by setting batch_method in the parameters that you wish to batch. This takes a list of parameter values (already parsed) and combines them into a single parameter. You can limit the number of tasks allowed to run simultaneously in a single batch by setting the class variable batch_size. If you want to exempt a specific task from being part of a batch, simply set its is_batchable property to False. Unlike spotify#1538, this does batch combining on the worker side, allowing arbitrary batch methods and keeping better track of task ids for things like execution summary.
Sometimes it's more efficient to run a group of tasks all at once rather than one at a time. With luigi, it's difficult to take advantage of this because your batch size will also be the minimum granularity you're able to compute. So if you have a job that runs hourly, you can't combine their computation when many of them get backlogged. When you have a task that runs daily, you can't get hourly runs. For the user, batching is accomplished by setting batch_method in the parameters that you wish to batch. This takes a list of parameter values (already parsed) and combines them into a single parameter. You can limit the number of tasks allowed to run simultaneously in a single batch by setting the class variable batch_size. If you want to exempt a specific task from being part of a batch, simply set its is_batchable property to False. Unlike spotify#1538, this does batch combining on the worker side, allowing arbitrary batch methods and keeping better track of task ids for things like execution summary.
Sometimes it's more efficient to run a group of tasks all at once rather than one at a time. With luigi, it's difficult to take advantage of this because your batch size will also be the minimum granularity you're able to compute. So if you have a job that runs hourly, you can't combine their computation when many of them get backlogged. When you have a task that runs daily, you can't get hourly runs. For the user, batching is accomplished by setting batch_method in the parameters that you wish to batch. This takes a list of parameter values (already parsed) and combines them into a single parameter. You can limit the number of tasks allowed to run simultaneously in a single batch by setting the class variable batch_size. If you want to exempt a specific task from being part of a batch, simply set its is_batchable property to False. Unlike spotify#1538, this does batch combining on the worker side, allowing arbitrary batch methods and keeping better track of task ids for things like execution summary.
Enables running of multiple tasks in batches
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.
In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of #570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.
In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.
This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. You can limit the number of tasks allowed to run
simultaneously in a single batch by setting the class variable batch_size. If
you want to exempt a specific task from being part of a batch, simply set its
is_batchable property to False.