Skip to content

Commit 866b0fe

Browse files
committed
[IMP] orm: add optional // attr call to iter_browse
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 909274f commit 866b0fe

File tree

1 file changed

+75
-8
lines changed

1 file changed

+75
-8
lines changed

src/util/orm.py

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import os
1315
import re
16+
from concurrent.futures import ProcessPoolExecutor
1417
from contextlib import contextmanager
1518
from functools import wraps
1619
from itertools import chain
@@ -27,9 +30,9 @@
2730
except ImportError:
2831
from odoo import SUPERUSER_ID
2932
from odoo import fields as ofields
30-
from odoo import modules, release
33+
from odoo import modules, release, sql_db
3134
except ImportError:
32-
from openerp import SUPERUSER_ID, modules, release
35+
from openerp import SUPERUSER_ID, modules, release, sql_db
3336

3437
try:
3538
from openerp import fields as ofields
@@ -41,8 +44,8 @@
4144
from .const import BIG_TABLE_THRESHOLD
4245
from .exceptions import MigrationError
4346
from .helpers import table_of_model
44-
from .misc import chunks, log_progress, version_between, version_gte
45-
from .pg import column_exists, format_query, get_columns, named_cursor
47+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
48+
from .pg import column_exists, format_query, get_columns, get_max_workers, named_cursor
4649

4750
# python3 shims
4851
try:
@@ -52,6 +55,8 @@
5255

5356
_logger = logging.getLogger(__name__)
5457

58+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
59+
5560

5661
def env(cr):
5762
"""
@@ -338,6 +343,31 @@ def get_ids():
338343
invalidate(records)
339344

340345

346+
def _mp_iter_browse_cb(ids_or_values):
347+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
348+
if not hasattr(_mp_iter_browse_cb, "env"):
349+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
350+
_mp_iter_browse_cb.env = env(sql_db.db_connect(_mp_iter_browse_cb.dbname).cursor())
351+
_mp_iter_browse_cb.env.clear()
352+
# process
353+
if _mp_iter_browse_cb.mode == "browse":
354+
getattr(
355+
_mp_iter_browse_cb.env[_mp_iter_browse_cb.model_name].browse(ids_or_values), _mp_iter_browse_cb.attr_name
356+
)(*_mp_iter_browse_cb.args, **_mp_iter_browse_cb.kwargs)
357+
if _mp_iter_browse_cb.mode == "create":
358+
_mp_iter_browse_cb.env[_mp_iter_browse_cb.model_name].create(ids_or_values)
359+
_mp_iter_browse_cb.env.cr.commit()
360+
361+
362+
def _init_mp_iter_browse_cb(dbname, model_name, attr_name, args, kwargs, mode):
363+
_mp_iter_browse_cb.dbname = dbname
364+
_mp_iter_browse_cb.model_name = model_name
365+
_mp_iter_browse_cb.attr_name = attr_name
366+
_mp_iter_browse_cb.args = args
367+
_mp_iter_browse_cb.kwargs = kwargs
368+
_mp_iter_browse_cb.mode = mode
369+
370+
341371
class iter_browse(object):
342372
"""
343373
Iterate over recordsets.
@@ -372,8 +402,8 @@ class iter_browse(object):
372402
:param model: the model to iterate
373403
:type model: :class:`odoo.model.Model`
374404
:param iterable(int) ids: iterable of IDs of the records to iterate
375-
:param int chunk_size: number of records to load in each iteration chunk, `200` by
376-
default
405+
:param int chunk_size: number of records to load in each iteration chunk, `200` by default
406+
:param bool multiprocessing: whether to process chunks in parallel
377407
:param logger: logger used to report the progress, by default
378408
:data:`~odoo.upgrade.util.orm._logger`
379409
:type logger: :class:`logging.Logger`
@@ -384,7 +414,17 @@ class iter_browse(object):
384414
See also :func:`~odoo.upgrade.util.orm.env`
385415
"""
386416

387-
__slots__ = ("_chunk_size", "_cr_uid", "_it", "_logger", "_model", "_patch", "_size", "_strategy")
417+
__slots__ = (
418+
"_chunk_size",
419+
"_cr_uid",
420+
"_it",
421+
"_logger",
422+
"_model",
423+
"_multiprocessing",
424+
"_patch",
425+
"_size",
426+
"_strategy",
427+
)
388428

389429
def __init__(self, model, *args, **kw):
390430
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -398,9 +438,24 @@ def __init__(self, model, *args, **kw):
398438
except TypeError:
399439
raise ValueError("When passing ids as a generator, the size kwarg is mandatory")
400440
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
441+
self._multiprocessing = kw.pop("multiprocessing", False)
401442
self._logger = kw.pop("logger", _logger)
402-
self._strategy = kw.pop("strategy", "flush")
443+
self._strategy = kw.pop("strategy", "commit" if self._multiprocessing and UPG_PARALLEL_ITER_BROWSE else "flush")
403444
assert self._strategy in {"flush", "commit"}
445+
if self._multiprocessing:
446+
if self._strategy == "flush":
447+
raise ValueError("With multiprocessing, strategy must be 'commit'")
448+
if self._size > 100000 and self._logger and not UPG_PARALLEL_ITER_BROWSE:
449+
self._logger.warning(
450+
"Browsing %d %s, which may take a long time. "
451+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
452+
"If you do, be sure to examine the results carefully.",
453+
self._size,
454+
self._model._name,
455+
)
456+
if UPG_PARALLEL_ITER_BROWSE:
457+
self._task_size = self._chunk_size
458+
self._chunk_size = max(get_max_workers() * 10 * self._task_size, 1000000)
404459
if kw:
405460
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406461

@@ -452,6 +507,18 @@ def __getattr__(self, attr):
452507

453508
def caller(*args, **kwargs):
454509
args = self._cr_uid + args
510+
if self._multiprocessing:
511+
_init_mp_iter_browse_cb(self._model.env.cr.dbname, self._model._name, attr, args, kwargs, "browse")
512+
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
513+
for chunk in it:
514+
collections.deque(
515+
executor.map(_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple)),
516+
maxlen=0,
517+
)
518+
next(self._end(), None)
519+
# do not return results in // mode, we expect it to be used for huge numbers of
520+
# records and thus would risk MemoryError
521+
return None
455522
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
456523

457524
self._it = None

0 commit comments

Comments
 (0)