Skip to content

Commit a40dde7

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.create()
Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. With the multiprocessing strategy, we make the following changes to it: - To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API. - make __iter__ yield chunks if strategy is multiprocessing. This way, a caller can process chunks of freshly created records `for records in util.iter_browse(strategy="multiprocessing").create(SQLStr)` and since everything from input to output is a generator, will be perfectly memory efficient. - do not pass the logger to the returned `iter_browse` object from `create`, if the strategy is multiprocessing, because it will lead to interleaved logging from the input generator and this one when the caller iterates it.
1 parent 4a56a32 commit a40dde7

File tree

1 file changed

+60
-18
lines changed

1 file changed

+60
-18
lines changed

src/util/orm.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,12 @@ def _mp_iter_browse_cb(ids_or_values, params):
367367
getattr(
368368
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369369
)(*params["args"], **params["kwargs"])
370+
if params["mode"] == "create":
371+
new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids
370372
me.env.cr.commit()
373+
if params["mode"] == "create":
374+
return new_ids
375+
return None
371376

372377

373378
class iter_browse(object):
@@ -522,8 +527,12 @@ def __iter__(self):
522527
raise RuntimeError("%r ran twice" % (self,))
523528

524529
it = chain.from_iterable(self._it)
530+
sz = self._size
531+
if self._strategy == "multiprocessing":
532+
it = self._it
533+
sz = (self._size + self._chunk_size - 1) // self._chunk_size
525534
if self._logger:
526-
it = log_progress(it, self._logger, qualifier=self._model._name, size=self._size)
535+
it = log_progress(it, self._logger, qualifier=self._model._name, size=sz)
527536
self._it = None
528537
return chain(it, self._end())
529538

@@ -593,31 +602,64 @@ def create(self, values, **kw):
593602
if self._size:
594603
raise ValueError("`create` can only called on empty `browse_record` objects.")
595604

596-
ids = []
605+
if self._strategy == "multiprocessing" and not multi:
606+
raise ValueError("The multiprocessing strategy only supports the multi version of `create`")
607+
597608
size = len(values)
598-
it = chunks(values, self._chunk_size, fmt=list)
609+
chunk_size = self._superchunk_size if self._strategy == "multiprocessing" else self._chunk_size
610+
it = chunks(values, chunk_size, fmt=list)
599611
if self._logger:
600-
sz = (size + self._chunk_size - 1) // self._chunk_size
601-
qualifier = "env[%r].create([:%d])" % (self._model._name, self._chunk_size)
612+
sz = (size + chunk_size - 1) // chunk_size
613+
qualifier = "env[%r].create([:%d])" % (self._model._name, chunk_size)
602614
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
603615

604-
self._patch = no_selection_cache_validation()
605-
for sub_values in it:
616+
def mp_create():
617+
params = {
618+
"dbname": self._model.env.cr.dbname,
619+
"model_name": self._model._name,
620+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
621+
"context": dict(self._model.env.context),
622+
"mode": "create",
623+
}
624+
self._model.env.cr.commit()
606625
self._patch.start()
626+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
627+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
628+
for sub_values in it:
629+
for task_result in executor.map(
630+
_mp_iter_browse_cb, chunks(sub_values, self._chunk_size, fmt=tuple), repeat(params)
631+
):
632+
self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids
633+
for new_id in task_result:
634+
yield new_id
635+
next(self._end(), None)
607636

608-
if multi:
609-
ids += self._model.create(sub_values).ids
610-
elif not self._cr_uid:
611-
ids += [self._model.create(sub_value).id for sub_value in sub_values]
612-
else:
613-
# old API, `create` directly return the id
614-
ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values]
637+
self._patch = no_selection_cache_validation()
638+
if self._strategy == "multiprocessing":
639+
ids = mp_create()
640+
else:
641+
ids = []
642+
for sub_values in it:
643+
self._patch.start()
644+
645+
if multi:
646+
ids += self._model.create(sub_values).ids
647+
elif not self._cr_uid:
648+
ids += [self._model.create(sub_value).id for sub_value in sub_values]
649+
else:
650+
# old API, `create` directly return the id
651+
ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values]
652+
653+
next(self._end(), None)
615654

616-
next(self._end(), None)
617655
args = self._cr_uid + (ids,)
618-
return iter_browse(
619-
self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy
620-
)
656+
kwargs = {
657+
"size": size,
658+
"chunk_size": self._chunk_size,
659+
"logger": None if self._strategy == "multiprocessing" else self._logger,
660+
"strategy": self._strategy,
661+
}
662+
return iter_browse(self._model, *args, **kwargs)
621663

622664

623665
@contextmanager

0 commit comments

Comments
 (0)