Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables running of multiple tasks in batches #1784

Merged
merged 9 commits into from
Aug 12, 2016
34 changes: 16 additions & 18 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,24 @@ setting the batch_method to max, like so:

.. code-block:: python

class ExampleTask(luigi.Task):
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)

What's exciting about this is that if you send multiple ExampleTasks to
the scheduler, it can combine them and return one. So if
ExampleTask(date=2016-07-28), ExampleTask(date=2016-07-29) and
ExampleTask(date=2016-07-30) are all ready to run, you will start running
ExampleTask(date=2016-07-30). While this is running, the scheduler will
show ExampleTask(date=2016-07-28), ExampleTask(date=2016-07-29) as batch
running while ExampleTask(date=2016-07-30) is running. When
ExampleTask(date=2016-07-30) is done running and becomes FAILED or DONE,
the other two tasks will be updated to the same status.

If you want to limit how big a batch can get, simply set batch_size.
What's exciting about this is that if you send multiple As to the
scheduler, it can combine them and return one. So if A(date=2016-07-28),
A(date=2016-07-29) and A(date=2016-07-30) are all ready to run, you will
start running A(date=2016-07-30). While this is running, the scheduler
will show A(date=2016-07-28), A(date=2016-07-29) as batch running while
A(date=2016-07-30) is running. When A(date=2016-07-30) is done running
and becomes FAILED or DONE, the other two tasks will be updated to the
same status.

If you want to limit how big a batch can get, simply set max_batch_size.
So if you have

.. code-block:: python

class ExampleTask(luigi.Task):
class A(luigi.Task):
date = luigi.DateParameter(batch_method=max)

max_batch_size = 10
Expand All @@ -165,15 +164,14 @@ be aggregated separately. So if you have a class like

.. code-block:: python

class ExampleTask(luigi.Task):
class A(luigi.Task):
p1 = luigi.IntParameter(batch_method=max)
p2 = luigi.IntParameter(batch_method=max)
p3 = luigi.IntParameter()

and you create tasks ExampleTask(p1=1, p2=2, p3=0),
ExampleTask(p1=2, p2=3, p3=0), ExampleTask(p1=3, p2=4, p3=1), you'll get
them batched as ExampleTask(p1=2, p2=3, p3=0) and
ExampleTask(p1=3, p2=4, p3=1).
and you create tasks A(p1=1, p2=2, p3=0),A(p1=2, p2=3, p3=0),
A(p1=3, p2=4, p3=1), you'll get them batched as A(p1=2, p2=3, p3=0) and
A(p1=3, p2=4, p3=1).

Tasks that regularly overwrite the same data source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
10 changes: 7 additions & 3 deletions luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
``positional=False`` for abstract base classes and similar cases.
:param bool always_in_help: For the --help option in the command line
parsing. Set true to always show in --help.
:param function(iterable[A]) -> A batch_method: Method to combine an iterable of parsed
parameter values into a single value. Used
when receiving batched parameter lists from
the scheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just created #1814 which is going to be able to help you.

As for referencing to a section in the documentation. See this example:

"""
self._default = default
self._batch_method = batch_method
Expand Down Expand Up @@ -231,13 +235,13 @@ def parse_list(self, xs):
"""
Parse a list of values from the scheduler.

Only possible if this parameter has a batch method. This will combine the list into a single
parameter value using batch method.
Only possible if this is_batchable() is True. This will combine the list into a single
parameter value using batch method. This should never need to be overridden.

:param xs: list of values to parse and combine
:return: the combined parsed values
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add in the docs that in most cases, this does not need to be overriden? (Or am I wrong about this?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about that. Added to the docs.

if self._batch_method is None:
if not self.is_batchable():
raise NotImplementedError('No batch method found')
elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
Expand Down
28 changes: 14 additions & 14 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
def __repr__(self):
return "Task(%r)" % vars(self)

# TODO(2017-08-10) replace this function with direct calls to batchable
# this only exists for backward compatibility
def is_batchable(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long term plan is just to replace this with self.batchable right? Can you write a comment that this is only for backward compatability (and perhaps include a date)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, added a TODO comment for a year from now since it'll probably be a while before this makes it to most end users.

try:
return self.batchable
Expand Down Expand Up @@ -380,13 +382,11 @@ def get_active_tasks(self, status=None):
yield task

def get_batch_running_tasks(self, batch_id):
if batch_id is None:
return []
else:
return [
task for task in self.get_active_tasks(BATCH_RUNNING)
if task.batch_id == batch_id
]
assert batch_id is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this style. :)

return [
task for task in self.get_active_tasks(BATCH_RUNNING)
if task.batch_id == batch_id
]

def get_running_tasks(self):
return six.itervalues(self._status_tasks[RUNNING])
Expand All @@ -396,8 +396,6 @@ def get_pending_tasks(self):
for status in [PENDING, RUNNING])

def set_batcher(self, worker_id, family, batcher_args, max_batch_size):
if max_batch_size is None:
max_batch_size = float('inf')
self._task_batchers.setdefault(worker_id, {})
self._task_batchers[worker_id][family] = (batcher_args, max_batch_size)

Expand Down Expand Up @@ -645,7 +643,7 @@ def _update_priority(self, task, prio, worker):
self._update_priority(t, prio, worker)

@rpc_method()
def add_task_batcher(self, worker, task_family, batched_args, max_batch_size=None):
def add_task_batcher(self, worker, task_family, batched_args, max_batch_size=float('inf')):
self._state.set_batcher(worker, task_family, batched_args, max_batch_size)

@rpc_method()
Expand Down Expand Up @@ -694,8 +692,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,

if tracking_url is not None or task.status != RUNNING:
task.tracking_url = tracking_url
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.tracking_url = tracking_url
if task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.tracking_url = tracking_url

if batchable is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid tribool logic here? Like now we have 3 possible values (None, True, False). Could the default value for batchable be set to False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchable is False by default when a new task is created. It'll never be None in the internal representation in the scheduler.

This just says not to override batchable when an add_task doesn't specify it. add_tasks for changing job statuses don't always contain every argument, so we don't want to change task.batchable if it isn't supplied. It's the same pattern we use for other parameters nearby in the same function.

task.batchable = batchable
Expand All @@ -705,8 +704,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,

if expl is not None:
task.expl = expl
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.expl = expl
if task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.expl = expl

if not (task.status in (RUNNING, BATCH_RUNNING) and status == PENDING) or new_deps:
# don't allow re-scheduling of task while it is running, it must either fail or succeed first
Expand Down
4 changes: 2 additions & 2 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class MyTask(luigi.Task):
#: Only works when using multiple workers.
worker_timeout = None

#: Maximum number of tasks to run together as a batch. If None, there is no limit.
max_batch_size = None
#: Maximum number of tasks to run together as a batch. Infinite by default
max_batch_size = float('inf')

@property
def batchable(self):
Expand Down
6 changes: 3 additions & 3 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant
self._scheduled_tasks = {}
self._suspended_tasks = {}
self._batch_running_tasks = {}
self._batch_classes_sent = set()
self._batch_families_sent = set()

self._first_task = None

Expand Down Expand Up @@ -582,7 +582,7 @@ def add(self, task, multiprocess=False):

def _add_task_batcher(self, task):
family = task.task_family
if family not in self._batch_classes_sent:
if family not in self._batch_families_sent:
task_class = type(task)
batch_param_names = task_class.batch_param_names()
if batch_param_names:
Expand All @@ -592,7 +592,7 @@ def _add_task_batcher(self, task):
batched_args=batch_param_names,
max_batch_size=task.max_batch_size,
)
self._batch_classes_sent.add(family)
self._batch_families_sent.add(family)

def _add(self, task, is_complete):
if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit:
Expand Down