Skip to content

Commit

Permalink
Enables running of multiple tasks in batches
Browse files Browse the repository at this point in the history
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. This takes a list of parameter values (already parsed)
and combines them into a single parameter. You can limit the number of tasks
allowed to run simultaneously in a single batch by setting the class variable
batch_size. If you want to exempt a specific task from being part of a batch,
simply set its is_batchable property to False.

Unlike #1538, this does batch combining on the worker side, allowing arbitrary
batch methods and keeping better track of task ids for things like execution
summary.
  • Loading branch information
daveFNbuck committed Jul 28, 2016
1 parent fd18bd6 commit 4761172
Show file tree
Hide file tree
Showing 10 changed files with 921 additions and 27 deletions.
92 changes: 92 additions & 0 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,98 @@ can have implications in how the scheduler and visualizer handle task instances.
luigi RangeDaily --of MyTask --start 2014-10-31 --MyTask-my-param 123
Batching mulitple parameter values into a single run
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Sometimes it'll be faster to run multiple jobs together as a single
batch rather than running them each individually. When this is the case,
you can mark some parameters with a batch_method in their constructor
to tell the worker how to combine multiple values. One common way to do
this is by aggregating the items as a tuple. You can use this inline,
like

.. code-block:: python
class ExampleTask(luigi.Task):
my_param = batch_tuple_parameter(luigi.IntParameter)()
or you can use it as a decorator to create a batch parameter like

.. code-block:: python
@batch_tuple_parameter
class IntListParameter(luigi.IntParameter()):
pass
class ExampleTask(luigi.Task):
my_param = IntListParameter()
In either case, ExampleTask.my_param will be a tuple of integers. What's
exciting about this is that if you send multiple ExampleTasks to the
scheduler, it can combine them and return one. So if
ExampleTask(my_param=1), ExampleTask(my_param=2) and
ExampleTask(my_param=3) are all ready to run, the scheduler will create
a new task ExampleTask(my_param=1,2,3) and the my_param parameter will
have a value of (1, 2, 3). While this is running, the scheduler will
show ExampleTask(my_param=1), ExampleTask(my_param=2) and
ExampleTask(my_param=3) as batch running while
ExampleTask(my_param=1,2,3) is running.

If you want to limit how big a batch can get, simply set batch_size.
So if you have

.. code-block:: python
class ExampleTask(luigi.Task):
my_param = batch_tuple_parameter(luigi.IntParameter)()
max_batch_size = 10
then the scheduler will batch at most 10 jobs together.

If you have two batch tuple parameters, you'll get a tuple of values
for both of them. If you have parameters that don't have a batch method,
they'll be aggregated separately. So if you have a class like

.. code-block:: python
class ExampleTask(luigi.Task):
p1 = batch_tuple_parameter(luigi.IntParameter)()
p2 = batch_tuple_parameter(luigi.IntParameter)()
p3 = luigi.IntParameter()
and you create tasks ExampleTask(p1=1, p2=2, p3=0),
ExampleTask(p1=2, p2=3, p3=0), ExampleTask(p1=3, p2=4, p3=1), you'll get
them batched as ExampleTask(p1=1,2, p2=2,3, p3=0) and
ExampleTask(p1=3, p2=4, p3=1).

Tasks that regularly overwrite the same data source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

One common case where a simpler form of batching can help is daily
overwriting of the same data source. You can do this pretty easily by
setting batch_mode to max and setting a unique resource:

.. code-block:: python
class DailyOverwriteTask(luigi.Task):
date = luigi.DateParameter(batch_mode=max)
resources = {'overwrite_resource': 1}
Now if you have multiple tasks such as
DailyOverwriteTask(date=2016-06-01),
DailyOverwriteTask(date=2016-06-02),
DailyOverwriteTask(date=2016-06-03), the scheduler will just tell you to
run the highest available one and mark the lower ones as batch_running.
Using a unique resource will prevent multiple tasks from writing to the
same location at the same time if a new one becomes available while
others are running.

As with tuple batching, you can use multiple of these parameters
together, along with batch tuple parameters and normal parameters.

Monitoring task pipelines
~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
54 changes: 53 additions & 1 deletion luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
'''

import abc
import contextlib
import csv
import datetime
import warnings
import json
Expand All @@ -30,6 +32,8 @@
import functools
from ast import literal_eval

from luigi.six import StringIO

try:
from ConfigParser import NoOptionError, NoSectionError
except ImportError:
Expand Down Expand Up @@ -72,6 +76,33 @@ class DuplicateParameterException(ParameterException):
pass


def _flatten_tuples(tuples):
return tuple(item for tuple in tuples for item in tuple)


def _csv_join(values):
""" Join all values as a csv """
with contextlib.closing(StringIO()) as io_obj:
writer = csv.writer(io_obj, lineterminator='')
writer.writerow(list(values))
return io_obj.getvalue()


def batch_tuple_parameter(parameter_class):
class BatchListParameter(parameter_class):
def __init__(self, *args, **kwargs):
super(BatchListParameter, self).__init__(*args, batch_method=_flatten_tuples, **kwargs)

def parse(self, csv_str):
reader = csv.reader([csv_str])
return tuple(map(super(BatchListParameter, self).parse, next(reader)))

def serialize(self, x):
return _csv_join(map(super(BatchListParameter, self).serialize, x))

return BatchListParameter


class Parameter(object):
"""
An untyped Parameter
Expand Down Expand Up @@ -116,7 +147,7 @@ def run(self):
_counter = 0 # non-atomically increasing counter used for ordering parameters.

def __init__(self, default=_no_value, is_global=False, significant=True, description=None,
config_path=None, positional=True, always_in_help=False):
config_path=None, positional=True, always_in_help=False, batch_method=None):
"""
:param default: the default value for this parameter. This should match the type of the
Parameter, i.e. ``datetime.date`` for ``DateParameter`` or ``int`` for
Expand All @@ -141,6 +172,7 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
parsing. Set true to always show in --help.
"""
self._default = default
self._batch_method = batch_method
if is_global:
warnings.warn("is_global support is removed. Assuming positional=False",
DeprecationWarning,
Expand Down Expand Up @@ -210,6 +242,9 @@ def task_value(self, task_name, param_name):
else:
return self.normalize(value)

def is_batchable(self):
return self._batch_method is not None

def parse(self, x):
"""
Parse an individual value from the input.
Expand All @@ -222,6 +257,23 @@ def parse(self, x):
"""
return x # default impl

def parse_list(self, xs):
"""
Parse a list of values from the scheduler.
Only possible if this parameter has a batch method. This will combine the list into a single
parameter value using batch method.
:param xs: list of values to parse and combine
:return: the combined parsed values
"""
if self._batch_method is None:
raise NotImplementedError('No batch method found')
elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
else:
return self._batch_method(map(self.parse, xs))

def serialize(self, x):
"""
Opposite of :py:meth:`parse`.
Expand Down
Loading

0 comments on commit 4761172

Please sign in to comment.