Skip to content

Commit

Permalink
Enables running of multiple tasks in batches (#1784)
Browse files Browse the repository at this point in the history
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. See the included documentation for more information.
  • Loading branch information
daveFNbuck authored and Tarrasch committed Aug 12, 2016
1 parent 396d138 commit aa07946
Show file tree
Hide file tree
Showing 11 changed files with 876 additions and 31 deletions.
5 changes: 3 additions & 2 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ marker-table
Table in which to store status of table updates. This table will be
created if it doesn't already exist. Defaults to "table_updates".

.. _resources-config:

[resources]
-----------
Expand Down Expand Up @@ -763,7 +764,7 @@ Luigi also supports defining retry-policy per task.
...
If none of retry-policy fields is defined per task, the field value will be **default** value which is defined in luigi config file.
If none of retry-policy fields is defined per task, the field value will be **default** value which is defined in luigi config file.

To make luigi sticks to the given retry-policy, be sure you run luigi worker with `keep_alive` config. Please check ``keep_alive`` config in :ref:`worker-config` section.

Expand All @@ -774,4 +775,4 @@ The fields below are in retry-policy and they can be defined per task.

* retry_count
* disable_hard_timeout
* disable_window_seconds
* disable_window_seconds
85 changes: 85 additions & 0 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,91 @@ can have implications in how the scheduler and visualizer handle task instances.
luigi RangeDaily --of MyTask --start 2014-10-31 --MyTask-my-param 123
.. _batch_method:

Batching multiple parameter values into a single run
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Sometimes it'll be faster to run multiple jobs together as a single
batch rather than running them each individually. When this is the case,
you can mark some parameters with a batch_method in their constructor
to tell the worker how to combine multiple values. One common way to do
this is by simply running the maximum value. This is good for tasks that
overwrite older data when a newer one runs. You accomplish this by
setting the batch_method to max, like so:

.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)
What's exciting about this is that if you send multiple As to the
scheduler, it can combine them and return one. So if
``A(date=2016-07-28)``, ``A(date=2016-07-29)`` and
``A(date=2016-07-30)`` are all ready to run, you will start running
``A(date=2016-07-30)``. While this is running, the scheduler will show
``A(date=2016-07-28)``, ``A(date=2016-07-29)`` as batch running while
``A(date=2016-07-30)`` is running. When ``A(date=2016-07-30)`` is done
running and becomes FAILED or DONE, the other two tasks will be updated
to the same status.

If you want to limit how big a batch can get, simply set max_batch_size.
So if you have

.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)
max_batch_size = 10
then the scheduler will batch at most 10 jobs together. You probably do
not want to do this with the max batch method, but it can be helpful if
you use other methods. You can use any method that takes a list of
parameter values and returns a single parameter value.

If you have two max batch parameters, you'll get the max values for both
of them. If you have parameters that don't have a batch method, they'll
be aggregated separately. So if you have a class like

.. code-block:: python
class A(luigi.Task):
p1 = luigi.IntParameter(batch_method=max)
p2 = luigi.IntParameter(batch_method=max)
p3 = luigi.IntParameter()
and you create tasks ``A(p1=1, p2=2, p3=0)``, ``A(p1=2, p2=3, p3=0)``,
``A(p1=3, p2=4, p3=1)``, you'll get them batched as
``A(p1=2, p2=3, p3=0)`` and ``A(p1=3, p2=4, p3=1)``.

Note that batched tasks do not take up :ref:`resources-config`, only the
task that ends up running will use resources. The scheduler only checks
that there are sufficient resources for each task individually before
batching them all together.

Tasks that regularly overwrite the same data source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you are overwriting of the same data source with every run, you'll
need to ensure that two batches can't run at the same time. You can do
this pretty easily by setting batch_mode to max and setting a unique
resource:

.. code-block:: python
class A(luigi.Task):
date = luigi.DateParameter(batch_mode=max)
resources = {'overwrite_resource': 1}
Now if you have multiple tasks such as ``A(date=2016-06-01)``,
``A(date=2016-06-02)``, ``A(date=2016-06-03)``, the scheduler will just
tell you to run the highest available one and mark the lower ones as
batch_running. Using a unique resource will prevent multiple tasks from
writing to the same location at the same time if a new one becomes
available while others are running.

Monitoring task pipelines
~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
27 changes: 26 additions & 1 deletion luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def run(self):
_counter = 0 # non-atomically increasing counter used for ordering parameters.

def __init__(self, default=_no_value, is_global=False, significant=True, description=None,
config_path=None, positional=True, always_in_help=False):
config_path=None, positional=True, always_in_help=False, batch_method=None):
"""
:param default: the default value for this parameter. This should match the type of the
Parameter, i.e. ``datetime.date`` for ``DateParameter`` or ``int`` for
Expand All @@ -139,8 +139,13 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
``positional=False`` for abstract base classes and similar cases.
:param bool always_in_help: For the --help option in the command line
parsing. Set true to always show in --help.
:param function(iterable[A])->A batch_method: Method to combine an iterable of parsed
parameter values into a single value. Used
when receiving batched parameter lists from
the scheduler. See :ref:`batch_method`
"""
self._default = default
self._batch_method = batch_method
if is_global:
warnings.warn("is_global support is removed. Assuming positional=False",
DeprecationWarning,
Expand Down Expand Up @@ -210,6 +215,9 @@ def task_value(self, task_name, param_name):
else:
return self.normalize(value)

def _is_batchable(self):
return self._batch_method is not None

def parse(self, x):
"""
Parse an individual value from the input.
Expand All @@ -222,6 +230,23 @@ def parse(self, x):
"""
return x # default impl

def _parse_list(self, xs):
"""
Parse a list of values from the scheduler.
Only possible if this is_batchable() is True. This will combine the list into a single
parameter value using batch method. This should never need to be overridden.
:param xs: list of values to parse and combine
:return: the combined parsed values
"""
if not self._is_batchable():
raise NotImplementedError('No batch method found')
elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
else:
return self._batch_method(map(self.parse, xs))

def serialize(self, x):
"""
Opposite of :py:meth:`parse`.
Expand Down
Loading

0 comments on commit aa07946

Please sign in to comment.