Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables running of multiple tasks in batches #1784

Merged
merged 9 commits into from
Aug 12, 2016

Conversation

daveFNbuck
Copy link
Contributor

Description

Implements batching with worker-side batch methods.

Motivation and Context

Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. This takes a list of parameter values (already parsed)
and combines them into a single parameter. You can limit the number of tasks
allowed to run simultaneously in a single batch by setting the class variable
batch_size. If you want to exempt a specific task from being part of a batch,
simply set its is_batchable property to False.

Unlike #1538, this does batch combining on the worker side, allowing arbitrary
batch methods and keeping better track of task ids for things like execution
summary.

Have you tested this? If so, how?

Added many unit tests. Will test this in production if I get an okay on the idea.

@@ -746,13 +820,18 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
if assistant:
self.add_worker(worker_id, [('assistant', assistant)])

batched_params, unbatched_params, batched_tasks, max_batch_size = None, None, [], 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using {} make more sense than None?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's a dict() not a set(), you see how easy it is to get confused? ^^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{} is a dict, not a set.

I think it's less clear to set it to an empty dict than to set it to None. An empty dict looks like something you're going to fill rather than replace. If you think it's more clear, I'm happy to change it though.

@Tarrasch
Copy link
Contributor

Tarrasch commented Jul 27, 2016

Ok. This looks more promising for every iteration.

What caught my attention the most is that it seems like batch_mode=my__pure_python_func is strictly better than the decorator method that you also have created. Why wouldn't one just use something like batch_method=(lambda x: x) or batch_mode=list?

@daveFNbuck
Copy link
Contributor Author

Instead of batch_mode=list, you'd need something like batch_mode=flatten as otherwise you'd have a list of lists. You'd then also need to implement parsing and serialization to handle lists and worry about escaping your separators, etc. I agree that just setting batch_mode=function would be better but unfortunately list parameters are tricky. Maybe we should just leave this feature out?

@daveFNbuck
Copy link
Contributor Author

It's basically the same problem we had previously with list parameters, it's difficult to do with just a single argument.

elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
else:
return self._batch_method(map(self.parse, xs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About if batch_method=list will work or not:

@daveFNbuck, you've really got me confused. Here it really looks like that xs is one iterable of values. Ignoring the parse-step. If we have xs=iterable(1,2,3), then as I see it:

  • batch_method=max ==> 3
  • batch_method=min ==> 1
  • batch_method=','.join ==> '1,2,3' (string)
  • batch_method=list ==> [1,2,3] (list of int)
  • batch_method=(lambda x: x) ==> iterable(1,2,3)

If it was this simple it would be fantastic. Because we wouldn't need batch_tuple_parameter. But that's not the case huh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose you have the following code:

class ExampleTask(luigi.Task):
    param = IntParameter(batch_method=list)

Now ExampleTask(1).param is an int, ExampleTask(2).param is an int, and so on. But if I send these both to the scheduler and get back a batch of both of them, I get ExampleTask([1, 2]). ExampleTask([1, 2]).param is a list of ints. If I call serialize on it, it's going to fail. I'll also have to write weird code that handles both ints and lists of ints.

To make this work cleanly, I'll need to change the type of this parameter from int to list of ints. I'll need to change the parse and serialize functions. Now batch_method=list won't work because if I batch ExampleTask([1]) and ExampleTask([2]), I get ExampleTask([[1, 2]]). Again, the types are wrong because list changes the type from X to list<X>. So I also need to import or write a flatten function to return a list.

So in order to get a list batch method, I need to write custom parse and serialize methods and find a flatten function. The parse and serialize methods might also need to handle weird escaping in case the parameter value could include the list separator. The only way I can see to make this all easy and likely to succeed for the average user is to provide a well-tested decorator. Otherwise, they're likely to fall into the same trap you did (and I did in some of my unit tests before getting this far) and just set batch_method=list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I see. I understand the issue.

If so. I suggest the following:

  • We start simple, lets not aim for support having lists yet as it's not as easy as one might think
  • Clearly say in the docs that <func> in batch_method=<func> must be a pure python function of signature <func> :: [A] -> A. (or however you would explain this in simple terms)
  • Also add a runtime check that checks that the type returned by <func> is correct.

Does that make sense @daveFNbuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runtime check could be tricky, as you could have [A] -> B where A and B are both subclasses of the same superclass. Unfortunately, that's true for any pair of new-style classes where object is the superclass. We probably don't want to have people explicitly pass the class. We could do a simple check like B can only be iterable if A is iterable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example: u','.join can take a list of str and return a unicode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I can't think of a reason this would actually happen with realistic code though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rely on you to come up with any simple compromise here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this into a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from this PR, not sure why the tests aren't re-running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it doesn't get triggered on pushes anymore.

@daveFNbuck daveFNbuck force-pushed the batch_tasks_2 branch 2 times, most recently from 4761172 to b9249f3 Compare August 1, 2016 18:24
daveFNbuck and others added 3 commits August 1, 2016 23:26
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. This takes a list of parameter values (already parsed)
and combines them into a single parameter. You can limit the number of tasks
allowed to run simultaneously in a single batch by setting the class variable
batch_size. If you want to exempt a specific task from being part of a batch,
simply set its is_batchable property to False.

Unlike spotify#1538, this does batch combining on the worker side, allowing arbitrary
batch methods and keeping better track of task ids for things like execution
summary.
@daveFNbuck
Copy link
Contributor Author

Are there any more issues with this? I think I've addressed them all.


What's exciting about this is that if you send multiple ExampleTasks to
the scheduler, it can combine them and return one. So if
ExampleTask(date=2016-07-28), ExampleTask(date=2016-07-29) and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could format this into code text?

I'm not sure, but perhaps just calling the task A will make it more readable as ExampleTask is quite big and draws user attention from what's important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way this is how it looks rendered (almost, it's not identical to readthedocs) https://github.com/Houzz/luigi/blob/985522850ecfc8d84b9e56cbeb67bde95042dfcc/doc/luigi_patterns.rst

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything else in the docs uses code-block. I'm having trouble finding code-text in the documentation for reStructuredText (it's not very searchable). Do I just replace code-block with code-text or did you mean to leave a space there? What's the difference between this and code-block? I think it looks fine in the link...

I changed ExampleTask to A.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. My previous comment was confusing. I just mean to turn "A(date=2016-07-28)" into "A(date=2016-07-28)". That is done using double-backticks (``) in RST.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks. Updated.

@Tarrasch
Copy link
Contributor

Starting to look good. I would like to get this in and then release 2.4.0. There's a lot of new cool features added recently. :)

@daveFNbuck
Copy link
Contributor Author

Thanks so much for all the comments! I'm very excited that we're finally going to get this feature into Luigi. It'll be very nice to end the ~2 years of having a significantly different fork.

:param function(iterable[A]) -> A batch_method: Method to combine an iterable of parsed
parameter values into a single value. Used
when receiving batched parameter lists from
the scheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just created #1814 which is going to be able to help you.

As for referencing to a section in the documentation. See this example:

@Tarrasch Tarrasch merged commit aa07946 into spotify:master Aug 12, 2016
@Tarrasch
Copy link
Contributor

Hi @daveFNbuck, I was hopeful to start using this. But in my case I would actually need something like the batch_tuple_parameter you had described here. I have an idea for a slightly different and hopefully even better API I'm willing to implement, but I couldn't find your original implementation. I see the docs in a commit here, but I don't see the code being in that commit. Do you still have the code lying around somewhere? Please push a branch if you have anything. Otherwise I can re-implement it.

My thought for api could be something like:

class MyTask(luigi.Task)
     city, cities = batch_tuple_parameter(ChoiceParameter)(choices=('stockholm', 'hcmc'))

Where you still do luigi MyTask --city stockholm and MyTask(city='hcmc'), but within run you aren't supposed to touch self.city but only self.cities.

Would this make sense? Perhaps we should even recommend _cities as to clarify that you can optimize your task by batching really shouldn't affect the API.

@daveFNbuck daveFNbuck deleted the batch_tasks_2 branch December 15, 2016 22:45
@daveFNbuck
Copy link
Contributor Author

@Tarrasch sorry I didn't notice your comment until now. I don't think I have the implementation anymore. I like your idea, as it would simplify an issue I have where I can't batch tasks that have standardized parameters.

This was referenced Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants