Skip to content

Commit

Permalink
[IMP] queue_job: Add split method
Browse files Browse the repository at this point in the history
  • Loading branch information
paradoxxxzero committed Nov 19, 2024
1 parent 0e39a39 commit f929b64
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 3 deletions.
32 changes: 32 additions & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t
of the graph. In the example above, if it was called on ``group_a``, then ``group_b``
would never be delayed (but a warning would be shown).

It is also possible to split a job into several jobs, each one processing a part of the
work. This can be useful to avoid very long jobs, parallelize some task and get more specific
errors. Usage is as follows:

.. code-block:: python
def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)
The ``split()`` method takes a ``chain`` boolean keyword argument. If set to
True, the jobs will be chained, meaning that the next job will only start when the previous
one is done:

.. code-block:: python
def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)
Enqueing Job Options
--------------------
Expand Down
46 changes: 46 additions & 0 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,52 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()

def split(self, size, chain=False):
"""Split the Delayables.
Use `DelayableGroup` or `DelayableChain`
if `chain` is True containing batches of size `size`
"""
if not self._job_method:
raise ValueError("No method set on the Delayable")

total_records = len(self.recordset)

delayables = []
for index in range(0, total_records, size):
recordset = self.recordset[index : index + size]
delayable = Delayable(
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
)
# Update the __self__
delayable._job_method = getattr(recordset, self._job_method.__name__)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

delayables.append(delayable)

description = self.description or (
self._job_method.__doc__.splitlines()[0].strip()
if self._job_method.__doc__
else "{}.{}".format(self.recordset._name, self._job_method.__name__)
)
for index, delayable in enumerate(delayables):
delayable.set(
description="%s (split %s/%s)"
% (description, index + 1, len(delayables))
)

# Prevent warning on deletion
self._generated_job = True

return (DelayableChain if chain else DelayableGroup)(*delayables)

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down
32 changes: 32 additions & 0 deletions queue_job/readme/USAGE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t
of the graph. In the example above, if it was called on ``group_a``, then ``group_b``
would never be delayed (but a warning would be shown).

It is also possible to split a job into several jobs, each one processing a part of the
work. This can be useful to avoid very long jobs, parallelize some task and get more specific
errors. Usage is as follows:

.. code-block:: python
def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)
The ``split()`` method takes a ``chain`` boolean keyword argument. If set to
True, the jobs will be chained, meaning that the next job will only start when the previous
one is done:

.. code-block:: python
def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)
Enqueing Job Options
--------------------
Expand Down
28 changes: 28 additions & 0 deletions queue_job/static/description/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,34 @@ <h3><a class="toc-backref" href="#toc-entry-5">Delaying jobs</a></h3>
<p>Note: <tt class="docutils literal">delay()</tt> must be called on the delayable, chain, or group which is at the top
of the graph. In the example above, if it was called on <tt class="docutils literal">group_a</tt>, then <tt class="docutils literal">group_b</tt>
would never be delayed (but a warning would be shown).</p>
<p>It is also possible to split a job into several jobs, each one processing a part of the
work. This can be useful to avoid very long jobs, parallelize some task and get more specific
errors. Usage is as follows:</p>
<pre class="code python literal-block">
<span class="k">def</span> <span class="nf">button_split_delayable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span><span class="w">
</span> <span class="p">(</span><span class="w">
</span> <span class="bp">self</span> <span class="c1"># Can be a big recordset, let's say 1000 records</span><span class="w">
</span> <span class="o">.</span><span class="n">delayable</span><span class="p">()</span><span class="w">
</span> <span class="o">.</span><span class="n">generate_thumbnail</span><span class="p">((</span><span class="mi">50</span><span class="p">,</span> <span class="mi">50</span><span class="p">))</span><span class="w">
</span> <span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="n">priority</span><span class="o">=</span><span class="mi">30</span><span class="p">)</span><span class="w">
</span> <span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="n">description</span><span class="o">=</span><span class="n">_</span><span class="p">(</span><span class="s2">&quot;generate xxx&quot;</span><span class="p">))</span><span class="w">
</span> <span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="mi">50</span><span class="p">)</span> <span class="c1"># Split the job in 20 jobs of 50 records each</span><span class="w">
</span> <span class="o">.</span><span class="n">delay</span><span class="p">()</span><span class="w">
</span> <span class="p">)</span>
</pre>
<p>The <tt class="docutils literal">split()</tt> method takes a <tt class="docutils literal">chain</tt> boolean keyword argument. If set to
True, the jobs will be chained, meaning that the next job will only start when the previous
one is done:</p>
<pre class="code python literal-block">
<span class="k">def</span> <span class="nf">button_increment_var</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span><span class="w">
</span> <span class="p">(</span><span class="w">
</span> <span class="bp">self</span><span class="w">
</span> <span class="o">.</span><span class="n">delayable</span><span class="p">()</span><span class="w">
</span> <span class="o">.</span><span class="n">increment_counter</span><span class="p">()</span><span class="w">
</span> <span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">chain</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> <span class="c1"># Will exceute the jobs one after the other</span><span class="w">
</span> <span class="o">.</span><span class="n">delay</span><span class="p">()</span><span class="w">
</span> <span class="p">)</span>
</pre>
</div>
<div class="section" id="enqueing-job-options">
<h3><a class="toc-backref" href="#toc-entry-6">Enqueing Job Options</a></h3>
Expand Down
1 change: 1 addition & 0 deletions queue_job/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import test_runner_channels
from . import test_runner_runner
from . import test_delayable
from . import test_delayable_split
from . import test_json_field
from . import test_model_job_channel
from . import test_model_job_function
Expand Down
6 changes: 3 additions & 3 deletions queue_job/tests/test_delayable.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# copyright 2019 Camptocamp
# license agpl-3.0 or later (http://www.gnu.org/licenses/agpl.html)

import unittest

import mock

from odoo.tests import common

# pylint: disable=odoo-addons-relative-import
from odoo.addons.queue_job.delay import Delayable, DelayableGraph


class TestDelayable(unittest.TestCase):
class TestDelayable(common.BaseCase):
def setUp(self):
super().setUp()
self.recordset = mock.MagicMock(name="recordset")
Expand Down
94 changes: 94 additions & 0 deletions queue_job/tests/test_delayable_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2024 Akretion (http://www.akretion.com).
# @author Florian Mounier <florian.mounier@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).

from odoo.tests import common

# pylint: disable=odoo-addons-relative-import
from odoo.addons.queue_job.delay import Delayable


class TestDelayableSplit(common.BaseCase):
def setUp(self):
super().setUp()

class FakeRecordSet(list):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._name = "recordset"

def __getitem__(self, key):
if isinstance(key, slice):
return FakeRecordSet(super().__getitem__(key))
return super().__getitem__(key)

Check warning on line 23 in queue_job/tests/test_delayable_split.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable_split.py#L23

Added line #L23 was not covered by tests

def method(self, arg, kwarg=None):
"""Method to be called"""
return arg, kwarg

Check warning on line 27 in queue_job/tests/test_delayable_split.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable_split.py#L27

Added line #L27 was not covered by tests

self.FakeRecordSet = FakeRecordSet

def test_delayable_split_no_method_call_beforehand(self):
dl = Delayable(self.FakeRecordSet(range(20)))
with self.assertRaises(ValueError):
dl.split(3)

def test_delayable_split_10_3(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(3)
self.assertEqual(len(group._delayables), 4)
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2]))
self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5]))
self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8]))
self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9]))
self.assertEqual(delayables[0].description, "Method to be called (split 1/4)")
self.assertEqual(delayables[1].description, "Method to be called (split 2/4)")
self.assertEqual(delayables[2].description, "Method to be called (split 3/4)")
self.assertEqual(delayables[3].description, "Method to be called (split 4/4)")
self.assertNotEqual(delayables[0]._job_method, dl._job_method)
self.assertNotEqual(delayables[1]._job_method, dl._job_method)
self.assertNotEqual(delayables[2]._job_method, dl._job_method)
self.assertNotEqual(delayables[3]._job_method, dl._job_method)
self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[0]._job_args, ("arg",))
self.assertEqual(delayables[1]._job_args, ("arg",))
self.assertEqual(delayables[2]._job_args, ("arg",))
self.assertEqual(delayables[3]._job_args, ("arg",))
self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"})

def test_delayable_split_10_5(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(5)
self.assertEqual(len(group._delayables), 2)
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4]))
self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9]))
self.assertEqual(delayables[0].description, "Method to be called (split 1/2)")
self.assertEqual(delayables[1].description, "Method to be called (split 2/2)")

def test_delayable_split_10_10(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(10)
self.assertEqual(len(group._delayables), 1)
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")

def test_delayable_split_10_20(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(20)
self.assertEqual(len(group._delayables), 1)
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")

0 comments on commit f929b64

Please sign in to comment.