diff --git a/pysus/data/local.py b/pysus/data/local.py index fdd0462..5ea7476 100644 --- a/pysus/data/local.py +++ b/pysus/data/local.py @@ -81,6 +81,8 @@ def parse_data_content( if data_path.suffix.lower() in [".dbc", ".dbf", ".parquet"]: content.append(ParquetSet(str(data_path), _pbar=_pbar)) + elif data_path.suffix.lower() == ".zip": + content.append(str(data_path)) else: continue diff --git a/pysus/ftp/databases/ibge_datasus.py b/pysus/ftp/databases/ibge_datasus.py index b6a777d..5e131f1 100644 --- a/pysus/ftp/databases/ibge_datasus.py +++ b/pysus/ftp/databases/ibge_datasus.py @@ -1,4 +1,5 @@ -from typing import Optional, List, Union +from typing import Optional, List, Union, Literal +from loguru import logger from pysus.ftp import Database, Directory, File from pysus.ftp.utils import zfill_year, to_list @@ -46,24 +47,47 @@ def describe(self, file: File) -> dict: return description return {} - def format(self, file: File) -> str: - return file.name[-2:] + def format(self, file: File) -> tuple: + return file.name[-2:], def get_files( - self, - year: Optional[Union[str, int, list]] = None, + self, + source: Literal["POP", "censo", "POPTCU", "projpop"] = "POPTCU", + year: Optional[Union[str, int, list]] = None, + *args, **kwargs ) -> List[File]: - files = [f for f in self.files if f.extension.upper( - ) in [".ZIP", ".DBF"] and self.describe(f)["year"] == year] - # files = list(filter( - # lambda f: f.extension.upper() in [".ZIP"], self.files - # )) + source_dir = None - if year or str(year) in ["0", "00"]: - years = ( - [zfill_year(str(y)[-4:]) for y in to_list(year)] - ) - files = list(filter(lambda f: zfill_year( - self.format(f)) in years, files)) + for dir in self.paths: + if ( + source in ["POP", "censo", "POPTCU", "projpop"] + and source in dir.path + ): + source_dir = dir + + if not source_dir: + raise ValueError(f"Unkown source {source}") + + files = source_dir.content + + if source in ["POPTCU", "censo", "POP"]: + if year: + if isinstance(year, (str, int)): + files = [ + f for f in files if + self.describe(f)["year"] == zfill_year(year) + ] + elif isinstance(year, list): + files = [ + f for f in files + if str(self.describe(f)["year"]) + in [str(zfill_year(y)) for y in year] + ] + else: + if year: + logger.warning( + f"{source} files are not arranged in years, " + "returning all files for source" + ) return files