-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite
Builder
without dask dependencies (#19)
- Loading branch information
1 parent
36ffcf2
commit c8287c2
Showing
7 changed files
with
226 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import fnmatch | ||
import itertools | ||
import pathlib | ||
import typing | ||
|
||
import joblib | ||
import pandas as pd | ||
import pydantic | ||
|
||
INVALID_ASSET = 'INVALID_ASSET' | ||
TRACEBACK = 'TRACEBACK' | ||
|
||
|
||
@pydantic.dataclasses.dataclass | ||
class Builder: | ||
""" | ||
Generates a catalog from a list of files. | ||
Parameters | ||
---------- | ||
root_path : str | ||
Path of root directory. | ||
extension : str, optional | ||
File extension, by default None. If None, the builder will look for files with | ||
"*.nc" extension. | ||
depth : int, optional | ||
Recursion depth. Recursively crawl `root_path` up to a specified depth, by default None | ||
exclude_patterns : list, optional | ||
Directory, file patterns to exclude during catalog generation, by default None | ||
njobs : int, optional | ||
The maximum number of concurrently running jobs, by default 25 | ||
""" | ||
|
||
root_path: pydantic.types.DirectoryPath | ||
extension: str = '*.nc' | ||
depth: int = 0 | ||
exclude_patterns: typing.List[str] = None | ||
parsing_func: typing.Callable = None | ||
njobs: int = -1 | ||
|
||
def __post_init__(self): | ||
self.df = pd.DataFrame() | ||
self.invalid_assets = pd.DataFrame() | ||
self.dirs = None | ||
self.filelist = None | ||
self.entries = None | ||
|
||
def get_directories(self): | ||
pattern = '*/' * (self.depth + 1) | ||
dirs = [x for x in self.root_path.glob(pattern) if x.is_dir()] | ||
if not dirs: | ||
dirs = [self.root_path] | ||
self.dirs = dirs | ||
return self | ||
|
||
def get_filelist(self): | ||
"""Get a list of files from a list of directories.""" | ||
|
||
def _filter_files(filelist): | ||
return not any( | ||
fnmatch.fnmatch(filelist, pat=exclude_pattern) | ||
for exclude_pattern in self.exclude_patterns | ||
) | ||
|
||
def _glob_dir(directory, extension): | ||
return list(directory.rglob(f'{extension}')) | ||
|
||
filelist = joblib.Parallel(n_jobs=self.njobs, verbose=5)( | ||
joblib.delayed(_glob_dir)(directory, self.extension) for directory in self.dirs | ||
) | ||
filelist = itertools.chain(*filelist) | ||
if self.exclude_patterns: | ||
filelist = list(filter(_filter_files, filelist)) | ||
self.filelist = list(filelist) | ||
return self | ||
|
||
def parse(self, parsing_func: typing.Callable = None): | ||
func = parsing_func or self.parsing_func | ||
if func is None: | ||
raise ValueError(f'`parsing_func` must a valid Callable. Got {type(func)}') | ||
entries = joblib.Parallel(n_jobs=self.njobs, verbose=5)( | ||
joblib.delayed(func)(file) for file in self.filelist | ||
) | ||
self.entries = entries | ||
self.df = pd.DataFrame(entries) | ||
return self | ||
|
||
def clean_dataframe(self): | ||
if INVALID_ASSET in self.df.columns: | ||
invalid_assets = self.df[self.df[INVALID_ASSET].notnull()][[INVALID_ASSET, TRACEBACK]] | ||
df = self.df[self.df[INVALID_ASSET].isnull()].drop(columns=[INVALID_ASSET, TRACEBACK]) | ||
self.invalid_assets = invalid_assets | ||
self.df = df | ||
return self | ||
|
||
def save( | ||
self, | ||
catalog_file: typing.Union[pathlib.Path, str], | ||
**kwargs, | ||
): | ||
catalog_file = pathlib.Path(catalog_file) | ||
index = kwargs.pop('index') or False | ||
self.df.to_csv(catalog_file, index=index, **kwargs) | ||
if not self.invalid_assets.empty: | ||
invalid_assets_report_file = ( | ||
catalog_file.parent / f'invalid_assets_{catalog_file.parts[-1]}' | ||
) | ||
self.invalid_assets.to_csv(invalid_assets_report_file, index=False) | ||
print(f'Saved catalog location: {catalog_file}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,8 @@ | ||
cf_xarray | ||
dask[bag] | ||
dask[delayed] | ||
ncar-jobqueue | ||
joblib | ||
netCDF4 | ||
rich | ||
typer | ||
xarray | ||
pyyaml | ||
pydantic | ||
pandas |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import os | ||
import pathlib | ||
import traceback | ||
|
||
import pandas as pd | ||
import pydantic | ||
import pytest | ||
|
||
from ecgtools import INVALID_ASSET, TRACEBACK, Builder | ||
|
||
sample_data_dir = pathlib.Path(os.path.dirname(__file__)).parent / 'sample_data' | ||
|
||
|
||
def parsing_func(file): | ||
return {'path': file} | ||
|
||
|
||
def parsing_func_errors(file): | ||
try: | ||
file.is_valid() | ||
except: | ||
return {INVALID_ASSET: file.as_posix(), TRACEBACK: traceback.format_exc()} | ||
|
||
|
||
def test_root_path_error(): | ||
with pytest.raises(pydantic.ValidationError): | ||
Builder('test_directory') | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'root_path', | ||
[ | ||
sample_data_dir / 'cmip' / 'CMIP6', | ||
sample_data_dir / 'cmip' / 'cmip5', | ||
sample_data_dir / 'cesm', | ||
], | ||
) | ||
def test_init(root_path): | ||
_ = Builder(root_path) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'root_path', | ||
[ | ||
sample_data_dir / 'cmip' / 'CMIP6', | ||
sample_data_dir / 'cmip' / 'cmip5', | ||
sample_data_dir / 'cesm', | ||
], | ||
) | ||
def test_get_filelist(root_path): | ||
b = Builder( | ||
root_path, | ||
exclude_patterns=['*/files/*', '*/latest/*'], | ||
).get_directories() | ||
assert b.dirs | ||
assert isinstance(b.dirs[0], pathlib.Path) | ||
|
||
b = b.get_filelist() | ||
assert b.filelist | ||
assert isinstance(b.filelist[0], pathlib.Path) | ||
|
||
|
||
def test_parse_error(): | ||
b = Builder(sample_data_dir / 'cesm').get_directories().get_filelist() | ||
|
||
with pytest.raises(ValueError): | ||
b.parse() | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'root_path', | ||
[ | ||
sample_data_dir / 'cmip' / 'CMIP6', | ||
sample_data_dir / 'cmip' / 'cmip5', | ||
sample_data_dir / 'cesm', | ||
], | ||
) | ||
def test_parse(root_path): | ||
b = ( | ||
Builder(root_path, exclude_patterns=['*/files/*', '*/latest/*'], parsing_func=parsing_func) | ||
.get_directories() | ||
.get_filelist() | ||
.parse() | ||
) | ||
assert b.entries | ||
assert isinstance(b.entries[0], dict) | ||
assert isinstance(b.df, pd.DataFrame) | ||
assert not b.df.empty | ||
|
||
|
||
def test_parse_invalid_assets(): | ||
b = ( | ||
Builder(sample_data_dir / 'cesm') | ||
.get_directories() | ||
.get_filelist() | ||
.parse(parsing_func=parsing_func_errors) | ||
.clean_dataframe() | ||
) | ||
|
||
assert not b.invalid_assets.empty | ||
assert set(b.invalid_assets.columns) == set([INVALID_ASSET, TRACEBACK]) |