Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

my.emfit: cleanup and pass cpu pool #333

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions my/emfit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@
'git+https://github.com/karlicoss/emfitexport',
]

from contextlib import contextmanager
import dataclasses
from datetime import datetime, time, timedelta
import inspect
from pathlib import Path
from typing import Dict, List, Iterable, Any, Optional

from ..core import get_files
from ..core.common import mcachew
from ..core.cachew import cache_dir
from ..core.error import Res, set_error_datetime, extract_error_datetime
from ..core.pandas import DataFrameT
from typing import Any, Dict, Iterable, Iterator, List, Optional

from my.core import (
get_files,
stat,
Res,
Stats,
)
from my.core.common import mcachew
from my.core.cachew import cache_dir
from my.core.error import set_error_datetime, extract_error_datetime
from my.core.pandas import DataFrameT

from my.config import emfit as config


import emfitexport.dal as dal
# todo ugh. need to make up my mind on log vs logger naming... I guessl ogger makes more sense
logger = dal.log
Emfit = dal.Emfit


Emfit = dal.Emfit


# TODO move to common?
Expand All @@ -39,13 +47,22 @@ def _cachew_depends_on():
# TODO take __file__ into account somehow?
@mcachew(cache_path=cache_dir() / 'emfit.cache', depends_on=_cachew_depends_on)
def datas() -> Iterable[Res[Emfit]]:
import dataclasses

# data from emfit is coming in UTC. There is no way (I think?) to know the 'real' timezone, and local times matter more for sleep analysis
# TODO actually this is wrong?? check this..
# TODO actually this is wrong?? there is some sort of local offset in the export
emfit_tz = config.timezone

for x in dal.sleeps(config.export_path):
## backwards compatibility (old DAL didn't have cpu_pool argument)
cpu_pool_arg = 'cpu_pool'
pass_cpu_pool = cpu_pool_arg in inspect.signature(dal.sleeps).parameters
if pass_cpu_pool:
from my.core._cpu_pool import get_cpu_pool

kwargs = {cpu_pool_arg: get_cpu_pool()}
else:
kwargs = {}
##

for x in dal.sleeps(config.export_path, **kwargs):
if isinstance(x, Exception):
yield x
else:
Expand All @@ -54,13 +71,15 @@ def datas() -> Iterable[Res[Emfit]]:
continue
# TODO maybe have a helper to 'patch up' all dattetimes in a namedtuple/dataclass?
# TODO do the same for jawbone data?
# fmt: off
x = dataclasses.replace(
x,
start =x.start .astimezone(emfit_tz),
end =x.end .astimezone(emfit_tz),
sleep_start=x.sleep_start.astimezone(emfit_tz),
sleep_end =x.sleep_end .astimezone(emfit_tz),
)
# fmt: on
yield x


Expand All @@ -78,7 +97,7 @@ def flush() -> Iterable[Res[Emfit]]:
yield r
else:
err = RuntimeError(f'Multiple sleeps per night, not supported yet: {g}')
set_error_datetime(err, dt=g[0].date)
set_error_datetime(err, dt=datetime.combine(g[0].date, time.min))
g.clear()
yield err

Expand All @@ -94,15 +113,14 @@ def flush() -> Iterable[Res[Emfit]]:


def dataframe() -> DataFrameT:
from datetime import timedelta
dicts: List[Dict[str, Any]] = []
last: Optional[Emfit] = None
for s in pre_dataframe():
d: Dict[str, Any]
if isinstance(s, Exception):
edt = extract_error_datetime(s)
d = {
'date' : edt,
'date': edt,
'error': str(s),
}
else:
Expand All @@ -117,6 +135,7 @@ def dataframe() -> DataFrameT:

# todo ugh. get rid of hardcoding, just generate the schema automatically
# TODO use 'workdays' provider....
# fmt: off
d = {
'date' : dd,

Expand All @@ -133,25 +152,24 @@ def dataframe() -> DataFrameT:
'hrv_change' : hrv_change,
'respiratory_rate_avg': s.respiratory_rate_avg,
}
last = s # meh
# fmt: on
last = s # meh
dicts.append(d)


import pandas

return pandas.DataFrame(dicts)


from ..core import stat, Stats
def stats() -> Stats:
return stat(pre_dataframe)


from contextlib import contextmanager
from typing import Iterator
@contextmanager
def fake_data(nights: int=500) -> Iterator:
def fake_data(nights: int = 500) -> Iterator:
from my.core.cfg import tmp_config
from tempfile import TemporaryDirectory

with TemporaryDirectory() as td:
tdir = Path(td)
gen = dal.FakeData()
Expand All @@ -168,5 +186,7 @@ class emfit:
# TODO remove/deprecate it? I think used by timeline
def get_datas() -> List[Emfit]:
# todo ugh. run lint properly
return list(sorted(datas(), key=lambda e: e.start)) # type: ignore
return list(sorted(datas(), key=lambda e: e.start)) # type: ignore


# TODO move away old entries if there is a diff??