Skip to content

Commit c484df4

Browse files
committed
[IMP] orm: add optional // attr call to iter_browse
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 909274f commit c484df4

File tree

1 file changed

+62
-9
lines changed

1 file changed

+62
-9
lines changed

src/util/orm.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import os
1315
import re
16+
from concurrent.futures import ProcessPoolExecutor
1417
from contextlib import contextmanager
1518
from functools import wraps
1619
from itertools import chain
@@ -27,9 +30,9 @@
2730
except ImportError:
2831
from odoo import SUPERUSER_ID
2932
from odoo import fields as ofields
30-
from odoo import modules, release
33+
from odoo import modules, release, sql_db
3134
except ImportError:
32-
from openerp import SUPERUSER_ID, modules, release
35+
from openerp import SUPERUSER_ID, modules, release, sql_db
3336

3437
try:
3538
from openerp import fields as ofields
@@ -41,8 +44,8 @@
4144
from .const import BIG_TABLE_THRESHOLD
4245
from .exceptions import MigrationError
4346
from .helpers import table_of_model
44-
from .misc import chunks, log_progress, version_between, version_gte
45-
from .pg import column_exists, format_query, get_columns, named_cursor
47+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
48+
from .pg import column_exists, format_query, get_columns, get_max_workers, named_cursor
4649

4750
# python3 shims
4851
try:
@@ -52,6 +55,8 @@
5255

5356
_logger = logging.getLogger(__name__)
5457

58+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
59+
5560

5661
def env(cr):
5762
"""
@@ -338,6 +343,31 @@ def get_ids():
338343
invalidate(records)
339344

340345

346+
def _mp_iter_browse_cb(ids_or_values):
347+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
348+
if not hasattr(_mp_iter_browse_cb, "env"):
349+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
350+
_mp_iter_browse_cb.env = env(sql_db.db_connect(_mp_iter_browse_cb.dbname).cursor())
351+
_mp_iter_browse_cb.env.clear()
352+
# process
353+
if _mp_iter_browse_cb.mode == "browse":
354+
getattr(
355+
_mp_iter_browse_cb.env[_mp_iter_browse_cb.model_name].browse(ids_or_values), _mp_iter_browse_cb.attr_name
356+
)(*_mp_iter_browse_cb.args, **_mp_iter_browse_cb.kwargs)
357+
if _mp_iter_browse_cb.mode == "create":
358+
_mp_iter_browse_cb.env[_mp_iter_browse_cb.model_name].create(ids_or_values)
359+
_mp_iter_browse_cb.env.cr.commit()
360+
361+
362+
def _init_mp_iter_browse_cb(dbname, model_name, attr_name, args, kwargs, mode):
363+
_mp_iter_browse_cb.dbname = dbname
364+
_mp_iter_browse_cb.model_name = model_name
365+
_mp_iter_browse_cb.attr_name = attr_name
366+
_mp_iter_browse_cb.args = args
367+
_mp_iter_browse_cb.kwargs = kwargs
368+
_mp_iter_browse_cb.mode = mode
369+
370+
341371
class iter_browse(object):
342372
"""
343373
Iterate over recordsets.
@@ -372,12 +402,11 @@ class iter_browse(object):
372402
:param model: the model to iterate
373403
:type model: :class:`odoo.model.Model`
374404
:param iterable(int) ids: iterable of IDs of the records to iterate
375-
:param int chunk_size: number of records to load in each iteration chunk, `200` by
376-
default
405+
:param int chunk_size: number of records to load in each iteration chunk, `200` by default
377406
:param logger: logger used to report the progress, by default
378407
:data:`~odoo.upgrade.util.orm._logger`
379408
:type logger: :class:`logging.Logger`
380-
:param str strategy: whether to `flush` or `commit` on each chunk, default is `flush`
409+
:param str strategy: whether to `flush`, `commit` or `multiprocess` each chunk, default is `flush`
381410
:return: the object returned by this class can be used to iterate, or call any model
382411
method, safely on millions of records.
383412
@@ -400,7 +429,19 @@ def __init__(self, model, *args, **kw):
400429
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
401430
self._logger = kw.pop("logger", _logger)
402431
self._strategy = kw.pop("strategy", "flush")
403-
assert self._strategy in {"flush", "commit"}
432+
assert self._strategy in {"flush", "commit", "multiprocessing"}
433+
if self._strategy == "multiprocessing":
434+
if UPG_PARALLEL_ITER_BROWSE:
435+
self._task_size = self._chunk_size
436+
self._chunk_size = max(get_max_workers() * 10 * self._task_size, 1000000)
437+
elif self._size > 100000 and self._logger:
438+
self._logger.warning(
439+
"Browsing %d %s, which may take a long time. "
440+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
441+
"If you do, be sure to examine the results carefully.",
442+
self._size,
443+
self._model._name,
444+
)
404445
if kw:
405446
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406447

@@ -417,7 +458,7 @@ def _browse(self, ids):
417458
return self._model.browse(*args)
418459

419460
def _end(self):
420-
if self._strategy == "commit":
461+
if self._strategy in ["commit", "multiprocessing"]:
421462
self._model.env.cr.commit()
422463
else:
423464
flush(self._model)
@@ -452,6 +493,18 @@ def __getattr__(self, attr):
452493

453494
def caller(*args, **kwargs):
454495
args = self._cr_uid + args
496+
if self._strategy == "multiprocessing":
497+
_init_mp_iter_browse_cb(self._model.env.cr.dbname, self._model._name, attr, args, kwargs, "browse")
498+
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
499+
for chunk in it:
500+
collections.deque(
501+
executor.map(_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple)),
502+
maxlen=0,
503+
)
504+
next(self._end(), None)
505+
# do not return results in // mode, we expect it to be used for huge numbers of
506+
# records and thus would risk MemoryError
507+
return None
455508
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
456509

457510
self._it = None

0 commit comments

Comments
 (0)