Skip to content

Commit d16c3f9

Browse files
committed
[IMP] orm: add optional // attr call to iter_browse
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 5d515f7 commit d16c3f9

File tree

1 file changed

+89
-8
lines changed

1 file changed

+89
-8
lines changed

src/util/orm.py

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
import uuid
19+
from concurrent.futures import ProcessPoolExecutor
1520
from contextlib import contextmanager
1621
from functools import wraps
17-
from itertools import chain
22+
from itertools import chain, repeat
1823
from textwrap import dedent
1924

2025
try:
@@ -28,9 +33,9 @@
2833
except ImportError:
2934
from odoo import SUPERUSER_ID
3035
from odoo import fields as ofields
31-
from odoo import modules, release
36+
from odoo import modules, release, sql_db
3237
except ImportError:
33-
from openerp import SUPERUSER_ID, modules, release
38+
from openerp import SUPERUSER_ID, modules, release, sql_db
3439

3540
try:
3641
from openerp import fields as ofields
@@ -42,8 +47,8 @@
4247
from .const import BIG_TABLE_THRESHOLD
4348
from .exceptions import MigrationError
4449
from .helpers import table_of_model
45-
from .misc import chunks, log_progress, version_between, version_gte
46-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor
50+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
51+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor
4752

4853
# python3 shims
4954
try:
@@ -53,6 +58,10 @@
5358

5459
_logger = logging.getLogger(__name__)
5560

61+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
62+
# FIXME: for CI! Remove before merge
63+
UPG_PARALLEL_ITER_BROWSE = True
64+
5665

5766
def env(cr):
5867
"""
@@ -342,6 +351,23 @@ def get_ids():
342351
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")
343352

344353

354+
def _mp_iter_browse_cb(ids_or_values, params):
355+
me = _mp_iter_browse_cb
356+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
357+
if not hasattr(me, "env"):
358+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
359+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
360+
me.env.clear()
361+
# process
362+
if params["mode"] == "browse":
363+
getattr(
364+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
365+
)(*params["args"], **params["kwargs"])
366+
if params["mode"] == "create":
367+
me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values)
368+
me.env.cr.commit()
369+
370+
345371
class iter_browse(object):
346372
"""
347373
Iterate over recordsets.
@@ -389,7 +415,18 @@ class iter_browse(object):
389415
See also :func:`~odoo.upgrade.util.orm.env`
390416
"""
391417

392-
__slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_size", "_strategy")
418+
__slots__ = (
419+
"_chunk_size",
420+
"_cr_uid",
421+
"_ids",
422+
"_it",
423+
"_logger",
424+
"_model",
425+
"_patch",
426+
"_size",
427+
"_strategy",
428+
"_task_size",
429+
)
393430

394431
def __init__(self, model, *args, **kw):
395432
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -400,7 +437,26 @@ def __init__(self, model, *args, **kw):
400437
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
401438
self._logger = kw.pop("logger", _logger)
402439
self._strategy = kw.pop("strategy", "flush")
403-
assert self._strategy in {"flush", "commit"}
440+
assert self._strategy in {"flush", "commit", "multiprocessing"}
441+
if self._strategy == "multiprocessing":
442+
if UPG_PARALLEL_ITER_BROWSE:
443+
self._task_size = self._chunk_size
444+
self._chunk_size = max(get_max_workers() * 10 * self._task_size, 1000000)
445+
else:
446+
self._strategy = "commit" # downgrade
447+
if self._size > 100000:
448+
_logger.warning(
449+
"Browsing %d %s, which may take a long time. "
450+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
451+
"If you do, be sure to examine the results carefully.",
452+
self._size,
453+
self._model._name,
454+
)
455+
else:
456+
_logger.info(
457+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
458+
"Downgrading strategy to commit.",
459+
)
404460
if kw:
405461
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
406462

@@ -447,7 +503,7 @@ def _browse(self, ids):
447503
return self._model.browse(*args)
448504

449505
def _end(self):
450-
if self._strategy == "commit":
506+
if self._strategy in ["commit", "multiprocessing"]:
451507
self._model.env.cr.commit()
452508
else:
453509
flush(self._model)
@@ -482,6 +538,31 @@ def __getattr__(self, attr):
482538

483539
def caller(*args, **kwargs):
484540
args = self._cr_uid + args
541+
if self._strategy == "multiprocessing":
542+
params = {
543+
"dbname": self._model.env.cr.dbname,
544+
"model_name": self._model._name,
545+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
546+
"context": dict(self._model.env.context),
547+
"attr_name": attr,
548+
"args": args,
549+
"kwargs": kwargs,
550+
"mode": "browse",
551+
}
552+
self._model.env.cr.commit()
553+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
554+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
555+
for chunk in it:
556+
collections.deque(
557+
executor.map(
558+
_mp_iter_browse_cb, chunks(chunk._ids, self._task_size, fmt=tuple), repeat(params)
559+
),
560+
maxlen=0,
561+
)
562+
next(self._end(), None)
563+
# do not return results in // mode, we expect it to be used for huge numbers of
564+
# records and thus would risk MemoryError
565+
return None
485566
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
486567

487568
self._it = None

0 commit comments

Comments
 (0)