From d6a7cefa817467266ad70872d8102b8abaa57ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Tue, 5 Sep 2023 11:05:38 -0300 Subject: [PATCH] Finish CNES --- pysus/ftp/__init__.py | 32 +++- pysus/ftp/databases.py | 250 ++++++++++++++++----------- pysus/tests/test_data/test_sinasc.py | 1 + 3 files changed, 172 insertions(+), 111 deletions(-) diff --git a/pysus/ftp/__init__.py b/pysus/ftp/__init__.py index 938d3c45..4638d5cb 100644 --- a/pysus/ftp/__init__.py +++ b/pysus/ftp/__init__.py @@ -3,7 +3,7 @@ from datetime import datetime from ftplib import FTP from functools import lru_cache -from typing import List, Optional, Set, Union +from typing import Any, List, Optional, Set, Union from aioftp import Client from loguru import logger @@ -13,6 +13,11 @@ ) +def to_list(ite: Any) -> list: + """Parse any builtin data type into a list""" + return [ite] if type(ite) in [str, float, int] else list(ite) + + class File: """ FTP File class. This class will contain methods for interacting with @@ -191,6 +196,7 @@ def list_path(path: str) -> List[Union[Directory, File]]: ftp.connect() ftp.login() ftp.cwd(path) + def line_file_parser(file_line): info = {} if "" in file_line: @@ -202,7 +208,9 @@ def line_file_parser(file_line): " ".join([date, time]), "%m-%d-%y %I:%M%p" ) info["modify"] = modify - xpath = path + name if path.endswith("/") else path + "/" + name + xpath = ( + path + name if path.endswith("/") else path + "/" + name + ) content.append(Directory(xpath, info)) else: date, time, size, name = str(file_line).strip().split() @@ -260,29 +268,34 @@ def __repr__(self) -> str: return f'{self.name} - {self.metadata["long_name"]}' @property - @lru_cache def content(self) -> List[Union[Directory, File]]: """ - Lists Database content. The `paths` will be loaded if this property is + Lists Database content. The `paths` will be loaded if this property is called or if explicty using `load()`. To add specific Directory inside content, `load()` the directory and call `content` again. """ if not self.__content__: - logger.info("content is not loaded, use `load()` to load default paths") + logger.info( + "content is not loaded, use `load()` to load default paths" + ) return [] return sorted(list(self.__content__), key=str) @property - @lru_cache def files(self) -> List[File]: """ Lists Files inside content. To load a specific Directory inside content, just `load()` this directory and list files again. """ if not self.__content__: - logger.info("content is not loaded, use `load()` to load default paths") + logger.info( + "content is not loaded, use `load()` to load default paths" + ) return [] - return sorted(list(filter(lambda f: isinstance(f, File), self.__content__)), key=str) + return sorted( + list(filter(lambda f: isinstance(f, File), self.__content__)), + key=str, + ) def load(self, paths: Optional[List[str]] = None) -> None: """ @@ -292,6 +305,9 @@ def load(self, paths: Optional[List[str]] = None) -> None: if not paths: paths = self.paths + if not isinstance(paths, list): + raise ValueError("paths must a list") + content = [] for path in paths: content.extend(list_path(str(path))) diff --git a/pysus/ftp/databases.py b/pysus/ftp/databases.py index e9d33db3..7c1543a6 100644 --- a/pysus/ftp/databases.py +++ b/pysus/ftp/databases.py @@ -1,17 +1,13 @@ import datetime from itertools import product -from typing import Any, List, Optional, Union +from typing import List, Optional, Union import humanize -from pysus.ftp import Database, File +from loguru import logger +from pysus.ftp import Database, Directory, File, list_path, to_list from pysus.utilities.brasil import MONTHS, UFs -def to_list(ite: Any) -> list: - """Parse any builtin data type into a list""" - return [ite] if type(ite) in [str, float, int] else list(ite) - - def zfill_year(year: Union[str, int]) -> int: """ Formats a len(2) year into len(4) with the correct year preffix @@ -30,7 +26,7 @@ def parse_UFs(UF: Union[list[str], str]) -> list: E.g: ['SC', 'mt', 'ba'] -> ['SC', 'MT', 'BA'] """ ufs = [uf.upper() for uf in to_list(UF)] - if not all([uf in list(UFs) for uf in ufs]): + if not all(uf in list(UFs) for uf in ufs): raise ValueError(f"Unknown UF(s): {set(ufs).difference(list(UFs))}") return ufs @@ -64,57 +60,57 @@ class SINAN(Database): ), ) - diseases = dict( - ACBI="Acidente de trabalho com material biológico", - ACGR="Acidente de trabalho", - ANIM="Acidente por Animais Peçonhentos", - ANTR="Atendimento Antirrabico", - BOTU="Botulismo", - CANC="Cancêr relacionado ao trabalho", - CHAG="Doença de Chagas Aguda", - CHIK="Febre de Chikungunya", - COLE="Cólera", - COQU="Coqueluche", - DENG="Dengue", - DERM="Dermatoses ocupacionais", - DIFT="Difteria", - ESQU="Esquistossomose", - EXAN="Doença exantemáticas", - FMAC="Febre Maculosa", - FTIF="Febre Tifóide", - HANS="Hanseníase", - HANT="Hantavirose", - HEPA="Hepatites Virais", - IEXO="Intoxicação Exógena", - INFL="Influenza Pandêmica", - LEIV="Leishmaniose Visceral", - LEPT="Leptospirose", - LERD="LER/Dort", - LTAN="Leishmaniose Tegumentar Americana", - MALA="Malária", - MENI="Meningite", - MENT="Transtornos mentais relacionados ao trabalho", - NTRA="Notificação de Tracoma", - PAIR="Perda auditiva por ruído relacionado ao trabalho", - PEST="Peste", - PFAN="Paralisia Flácida Aguda", - PNEU="Pneumoconioses realacionadas ao trabalho", - RAIV="Raiva", - SDTA="Surto Doenças Transmitidas por Alimentos", - SIFA="Sífilis Adquirida", - SIFC="Sífilis Congênita", - SIFG="Sífilis em Gestante", - SRC="Síndrome da Rubéola Congênia", - TETA="Tétano Acidental", - TETN="Tétano Neonatal", - TOXC="Toxoplasmose Congênita", - TOXG="Toxoplasmose Gestacional", - TRAC="Inquérito de Tracoma", - TUBE="Tuberculose", - VARC="Varicela", - VIOL="Violência doméstica, sexual e/ou outras violências", - ZIKA="Zika Vírus", - ) + diseases = { + "ACBI": "Acidente de trabalho com material biológico", + "ACGR": "Acidente de trabalho", + "ANIM": "Acidente por Animais Peçonhentos", + "ANTR": "Atendimento Antirrabico", + "BOTU": "Botulismo", + "CANC": "Cancêr relacionado ao trabalho", + "CHAG": "Doença de Chagas Aguda", + "CHIK": "Febre de Chikungunya", + "COLE": "Cólera", + "COQU": "Coqueluche", + "DENG": "Dengue", + "DERM": "Dermatoses ocupacionais", + "DIFT": "Difteria", + "ESQU": "Esquistossomose", + "EXAN": "Doença exantemáticas", + "FMAC": "Febre Maculosa", + "FTIF": "Febre Tifóide", + "HANS": "Hanseníase", + "HANT": "Hantavirose", + "HEPA": "Hepatites Virais", + "IEXO": "Intoxicação Exógena", + "INFL": "Influenza Pandêmica", + "LEIV": "Leishmaniose Visceral", + "LEPT": "Leptospirose", + "LERD": "LER/Dort", + "LTAN": "Leishmaniose Tegumentar Americana", + "MALA": "Malária", + "MENI": "Meningite", + "MENT": "Transtornos mentais relacionados ao trabalho", + "NTRA": "Notificação de Tracoma", + "PAIR": "Perda auditiva por ruído relacionado ao trabalho", + "PEST": "Peste", + "PFAN": "Paralisia Flácida Aguda", + "PNEU": "Pneumoconioses realacionadas ao trabalho", + "RAIV": "Raiva", + "SDTA": "Surto Doenças Transmitidas por Alimentos", + "SIFA": "Sífilis Adquirida", + "SIFC": "Sífilis Congênita", + "SIFG": "Sífilis em Gestante", + "SRC": "Síndrome da Rubéola Congênia", + "TETA": "Tétano Acidental", + "TETN": "Tétano Neonatal", + "TOXC": "Toxoplasmose Congênita", + "TOXG": "Toxoplasmose Gestacional", + "TRAC": "Inquérito de Tracoma", + "TUBE": "Tuberculose", + "VARC": "Varicela", + "VIOL": "Violência doméstica, sexual e/ou outras violências", + "ZIKA": "Zika Vírus", + } def describe(self, file: File) -> dict: if file.extension.upper() == ".DBC": @@ -125,7 +121,9 @@ def describe(self, file: File) -> dict: "disease": self.diseases[dis_code], "year": zfill_year(year), "size": humanize.naturalsize(file.info["size"]), - "last_update": file.info["modify"].strftime("%m-%d-%Y %I:%M%p"), + "last_update": file.info["modify"].strftime( + "%m-%d-%Y %I:%M%p" + ), } return description return {} @@ -186,12 +184,12 @@ class SIM(Database): "/dissemin/publicos/SIM/CID10/DORES", "/dissemin/publicos/SIM/CID9/DORES", ] - metadata = dict( - long_name="Sistema de Informação sobre Mortalidade", - source="http://sim.saude.gov.br", - description="", - ) - groups = dict(DO="CID10", DOR="CID9") + metadata = { + "long_name": "Sistema de Informação sobre Mortalidade", + "source": "http://sim.saude.gov.br", + "description": "", + } + groups = {"DO": "CID10", "DOR": "CID9"} def describe(self, file: File) -> dict: group, uf, year = self.format(file) @@ -227,7 +225,8 @@ def get_files( if not all([gr in list(self.groups.values()) for gr in groups]): raise ValueError( - f"Unknown group(s): {set(groups).difference(self.groups.values())}" + "Unknown group(s): " + f"{set(groups).difference(self.groups.values())}" ) targets = [] @@ -345,13 +344,13 @@ class SIH(Database): "/dissemin/publicos/SIHSUS/199201_200712/Dados", "/dissemin/publicos/SIHSUS/200801_/Dados", ] - metadata = dict( - long_name="Sistema de Informações Hospitalares", - source=( + metadata = { + "long_name": "Sistema de Informações Hospitalares", + "source": ( "https://datasus.saude.gov.br/acesso-a-informacao/morbidade-hospitalar-do-sus-sih-sus/", "https://datasus.saude.gov.br/acesso-a-informacao/producao-hospitalar-sih-sus/", ), - description=( + "description": ( "A finalidade do AIH (Sistema SIHSUS) é a de transcrever todos os " "atendimentos que provenientes de internações hospitalares que " "foram financiadas pelo SUS, e após o processamento, gerarem " @@ -363,15 +362,15 @@ class SIH(Database): "além dos valores de CNRAC, FAEC e de Hospitais Universitários – em suas " "variadas formas de contrato de gestão." ), - ) - groups = dict( - RD="AIH Reduzida", - RJ="AIH Rejeitada", - ER="AIH Rejeitada com erro", - SP="Serviços Profissionais", - CH="Cadastro Hospitalar", - CM="", # TODO - ) + } + groups = { + "RD": "AIH Reduzida", + "RJ": "AIH Rejeitada", + "ER": "AIH Rejeitada com erro", + "SP": "Serviços Profissionais", + "CH": "Cadastro Hospitalar", + "CM": "", # TODO + } def describe(self, file: File) -> dict: if file.extension.upper() == ".DBC": @@ -509,7 +508,7 @@ def get_files( if not all([gr in list(self.groups) for gr in groups]): raise ValueError( - f"Unknown SIH Group(s): {set(groups).difference(list(self.groups))}" + f"Unknown SIH Group(s): {set(groups).difference(list(self.groups))}" ) # Fist filter files by group to reduce the files list length @@ -530,9 +529,7 @@ def get_files( class CNES(Database): name = "CNES" - paths = [ - "/dissemin/publicos/CNES/200508_/Dados" - ] + paths = ["/dissemin/publicos/CNES/200508_/Dados"] metadata = { "long_name": "Cadastro Nacional de Estabelecimentos de Saúde", "source": "https://cnes.datasus.gov.br/", @@ -561,9 +558,61 @@ class CNES(Database): "SR": "Serviço Especializado", "ST": "Estabelecimentos", } + __loaded__ = [] - def describe(self, file: File) -> dict: - if file.extension.upper() == ".DBC": + def load( + self, + paths: Optional[List[str]] = None, + groups: Optional[List[str]] = None, + ) -> None: + """ + Loads specific paths to Database content, can receive CNES Groups as well. + It will convert the files found within the paths into content. + """ + xpaths = [] + + if not paths and not groups: + xpaths.extend(self.paths) + + if paths: + if not isinstance(paths, list): + raise ValueError("paths must a list") + + xpaths.extend(paths) + + if groups: + if not self.__content__: + self.load() + + if not isinstance(groups, list): + raise ValueError("groups must a list") + + if not all( + group in self.groups for group in [gr.upper() for gr in groups] + ): + raise ValueError( + f"Unknown CNES group(s): {set(groups).difference(self.groups)}" + ) + + dirs = list( + filter(lambda c: isinstance(c, Directory), self.__content__) + ) + + for directory in dirs: + if directory.name in [gr.upper() for gr in groups]: + xpaths.append(directory.path) + self.__loaded__.append(directory.name) + + content = [] + for path in xpaths: + content.extend(list_path(str(path))) + self.__content__.update(set(content)) + + def describe(self, file: File): + if not isinstance(file, File): + return file + + if file.extension.upper() in [".DBC", ".DBF"]: group, uf, year, month = self.format(file) description = { @@ -573,11 +622,13 @@ def describe(self, file: File) -> dict: "month": MONTHS[int(month)], "year": zfill_year(year), "size": humanize.naturalsize(file.info["size"]), - "last_update": file.info["modify"].strftime("%m-%d-%Y %I:%M%p"), + "last_update": file.info["modify"].strftime( + "%m-%d-%Y %I:%M%p" + ), } return description - return {} + return file def format(self, file: File) -> tuple: group, uf = file.name[:2].upper(), file.name[2:4].upper() @@ -588,30 +639,23 @@ def get_files( self, groups: Union[List[str], str], ufs: Union[List[str], str], - months: Union[list, str, int], years: Union[list, str, int], + months: Union[list, str, int], ) -> List[File]: groups = [gr.upper() for gr in to_list(groups)] ufs = parse_UFs(ufs) - months = [str(y)[-2:].zfill(2) for y in to_list(months)] years = [str(m)[-2:].zfill(2) for m in to_list(years)] + months = [str(y)[-2:].zfill(2) for y in to_list(months)] if not all([gr in list(self.groups) for gr in groups]): raise ValueError( - f"Unknown SIH Group(s): {set(groups).difference(list(self.groups))}" + f"Unknown CNES Group(s): {set(groups).difference(list(self.groups))}" ) - # Fist filter files by group to reduce the files list length - groups_files = [] - for file in self.files: - if file.name[:2] in groups: - groups_files.append(file) - - targets = ["".join(t) for t in product(ufs, months, years)] + for group in groups: + if group not in self.__loaded__: + self.load(groups=groups) - files = [] - for file in groups_files: - if file.name[2:] in targets: - files.append(file) + targets = ["".join(t) for t in product(groups, ufs, years, months)] - return files + return [f for f in self.files if f.name in targets] diff --git a/pysus/tests/test_data/test_sinasc.py b/pysus/tests/test_data/test_sinasc.py index 4daf6bce..cf670a38 100644 --- a/pysus/tests/test_data/test_sinasc.py +++ b/pysus/tests/test_data/test_sinasc.py @@ -22,6 +22,7 @@ def test_download_old(self): self.assertIn("IDADE_MAE", df.columns) self.assertGreater(len(df), 0) + @pytest.mark.skip(reason="This test takes too long") @pytest.mark.timeout(5) def test_get_available_years(self): files = get_available_years("AC")