From cc0661d945ceec1eeae4e64a88887314dc3f7027 Mon Sep 17 00:00:00 2001 From: karlicoss Date: Tue, 24 Oct 2023 23:18:02 +0100 Subject: [PATCH] dal: experimental support for using external CPU pool json parsing is the heaviest bit of DAL and this gives a massive speedup by default it wil run serially, so this is backwards compatible --- src/rexport/dal.py | 34 ++++++++++++++++++++++++++-------- src/rexport/utils.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) create mode 100644 src/rexport/utils.py diff --git a/src/rexport/dal.py b/src/rexport/dal.py index 8107d3c..6dbc289 100755 --- a/src/rexport/dal.py +++ b/src/rexport/dal.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 +from concurrent.futures import Executor import contextlib from dataclasses import dataclass from datetime import datetime, timezone import json from pathlib import Path -from typing import Iterator, Sequence, Set +from typing import Iterator, Optional, Sequence, Set from .exporthelpers import dal_helper, logging_helper -from .exporthelpers.dal_helper import PathIsh, Json, json_items, datetime_aware, pathify +from .exporthelpers.dal_helper import PathIsh, Json, datetime_aware, pathify +from .utils import json_items_as_list, DummyFuture logger = logging_helper.make_logger(__name__) @@ -201,8 +203,9 @@ def make_dt(ts: float) -> datetime_aware: class DAL: - def __init__(self, sources: Sequence[PathIsh]) -> None: + def __init__(self, sources: Sequence[PathIsh], *, cpu_pool: Optional[Executor] = None) -> None: self.sources = list(map(pathify, sources)) + self.cpu_pool = cpu_pool self.enlighten = logging_helper.get_enlighten() # not sure how useful it is, but keeping for compatibility @@ -212,14 +215,29 @@ def raw(self): yield f, json.load(fo) def _raw_json(self, *, what: str) -> Iterator[Json]: - pbar = self.enlighten.counter(total=len(self.sources), desc=f'{__name__}[{what}]', unit='files') - for f in self.sources: + progress_bar = self.enlighten.counter(total=len(self.sources), desc=f'{__name__}[{what}]', unit='files') + + cpu_pool = self.cpu_pool + + futures = [] + for path in self.sources: # TODO maybe if enlighten is used, this should be debug instead? so logging isn't too verbose - logger.info(f'processing {f}') + logger.info(f'processing {path}') + + if cpu_pool is not None: + future = cpu_pool.submit(json_items_as_list, path, what) + else: + future = DummyFuture(json_items_as_list, path, what) + + futures.append(future) + + for f in futures: + res = f.result() + progress_bar.update() + # default sort order seems to return in the reverse order of 'save time', which makes sense to preserve # TODO reversing should probably be responsibility of HPI? - yield from reversed(list(json_items(f, what))) - pbar.update() + yield from reversed(res) def _accumulate(self, *, what: str, key: str = 'id') -> Iterator[Json]: emitted: Set[str] = set() diff --git a/src/rexport/utils.py b/src/rexport/utils.py new file mode 100644 index 0000000..b09859d --- /dev/null +++ b/src/rexport/utils.py @@ -0,0 +1,30 @@ +from concurrent.futures import Future +from typing import Any, List, TYPE_CHECKING + +from .exporthelpers.dal_helper import json_items, Json + + +# TODO move to dal_helper +def json_items_as_list(*args, **kwargs) -> List[Json]: + return list(json_items(*args, **kwargs)) + + +# TODO move to dal helper? +if TYPE_CHECKING: + # just to aid mypy -- it doesn't really behave like a proper Future in runtime + DummyFutureBase = Future[Any] +else: + # in principle inheriting from Future in runtime also works + # but not sure what would happen when we start calling other Future methods + # so best to keep it simple for now + DummyFutureBase = object + + +class DummyFuture(DummyFutureBase): + def __init__(self, fn, *args, **kwargs): + self.fn = fn + self.args = args + self.kwargs = kwargs + + def result(self): + return self.fn(*self.args, **self.kwargs)