Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dal: experimental support for using external CPU pool #22

Merged
merged 1 commit into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/rexport/dal.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python3
from concurrent.futures import Executor
import contextlib
from dataclasses import dataclass
from datetime import datetime, timezone
import json
from pathlib import Path
from typing import Iterator, Sequence, Set
from typing import Iterator, Optional, Sequence, Set

from .exporthelpers import dal_helper, logging_helper
from .exporthelpers.dal_helper import PathIsh, Json, json_items, datetime_aware, pathify
from .exporthelpers.dal_helper import PathIsh, Json, datetime_aware, pathify
from .utils import json_items_as_list, DummyFuture


logger = logging_helper.make_logger(__name__)
Expand Down Expand Up @@ -201,8 +203,9 @@ def make_dt(ts: float) -> datetime_aware:


class DAL:
def __init__(self, sources: Sequence[PathIsh]) -> None:
def __init__(self, sources: Sequence[PathIsh], *, cpu_pool: Optional[Executor] = None) -> None:
self.sources = list(map(pathify, sources))
self.cpu_pool = cpu_pool
self.enlighten = logging_helper.get_enlighten()

# not sure how useful it is, but keeping for compatibility
Expand All @@ -212,14 +215,29 @@ def raw(self):
yield f, json.load(fo)

def _raw_json(self, *, what: str) -> Iterator[Json]:
pbar = self.enlighten.counter(total=len(self.sources), desc=f'{__name__}[{what}]', unit='files')
for f in self.sources:
progress_bar = self.enlighten.counter(total=len(self.sources), desc=f'{__name__}[{what}]', unit='files')

cpu_pool = self.cpu_pool

futures = []
for path in self.sources:
# TODO maybe if enlighten is used, this should be debug instead? so logging isn't too verbose
logger.info(f'processing {f}')
logger.info(f'processing {path}')

if cpu_pool is not None:
future = cpu_pool.submit(json_items_as_list, path, what)
else:
future = DummyFuture(json_items_as_list, path, what)

futures.append(future)

for f in futures:
res = f.result()
progress_bar.update()

# default sort order seems to return in the reverse order of 'save time', which makes sense to preserve
# TODO reversing should probably be responsibility of HPI?
yield from reversed(list(json_items(f, what)))
pbar.update()
yield from reversed(res)

def _accumulate(self, *, what: str, key: str = 'id') -> Iterator[Json]:
emitted: Set[str] = set()
Expand Down
30 changes: 30 additions & 0 deletions src/rexport/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from concurrent.futures import Future
from typing import Any, List, TYPE_CHECKING

from .exporthelpers.dal_helper import json_items, Json


# TODO move to dal_helper
def json_items_as_list(*args, **kwargs) -> List[Json]:
return list(json_items(*args, **kwargs))


# TODO move to dal helper?
if TYPE_CHECKING:
# just to aid mypy -- it doesn't really behave like a proper Future in runtime
DummyFutureBase = Future[Any]
else:
# in principle inheriting from Future in runtime also works
# but not sure what would happen when we start calling other Future methods
# so best to keep it simple for now
DummyFutureBase = object


class DummyFuture(DummyFutureBase):
def __init__(self, fn, *args, **kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs

def result(self):
return self.fn(*self.args, **self.kwargs)