Skip to content

Commit

Permalink
feat(pbar): include a progress bar to download and parsing data
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Sep 19, 2023
1 parent 58fadde commit 8cd691c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 33 deletions.
54 changes: 35 additions & 19 deletions pysus/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
from pyreaddbc import dbc2dbf


def dbc_to_dbf(dbc: str) -> str:
def dbc_to_dbf(dbc: str, _pbar = None) -> str:
path = Path(dbc)

if path.suffix.lower() != ".dbc":
raise ValueError(f"Not a DBC file: {path}")

dbf = path.with_suffix(".dbf")

if _pbar:
_pbar.reset(total=1)
_pbar.set_description(f"{dbf.name}")

_parquet = path.with_suffix(".parquet")
if _parquet.exists():
path.unlink(missing_ok=True)
Expand All @@ -30,6 +34,9 @@ def dbc_to_dbf(dbc: str) -> str:
dbc2dbf(str(path), str(dbf))
path.unlink()

if _pbar:
_pbar.update(1)

return str(dbf)


Expand All @@ -48,36 +55,45 @@ def stream_dbf(dbf, chunk_size=30000):
yield data


def dbf_to_parquet(dbf: str) -> str:
def dbf_to_parquet(dbf: str, _pbar = None) -> str:
path = Path(dbf)

if path.suffix.lower() != ".dbf":
raise ValueError(f"Not a DBF file: {path}")

parquet = path.with_suffix(".parquet")

approx_final_size = os.path.getsize(path) / 200 # TODO: not best approx size
if _pbar:
_pbar.unit = "B"
_pbar.unit_scale = True
_pbar.reset(total=approx_final_size)
_pbar.set_description(f"{parquet.name}")

if parquet.exists():
if _pbar:
_pbar.update(approx_final_size - _pbar.n)
return str(parquet)

parquet.absolute().mkdir()

approx_final_size = os.path.getsize(path) / 200 # TODO: not best approx size
with tqdm(total=approx_final_size, unit='B', unit_scale=True) as pbar:
pbar.set_description("DBF to Parquets")
try:
chunk_size = 30_000
for chunk in stream_dbf(
DBF(path, encoding="iso-8859-1", raw=True), chunk_size
):
chunk_df = pd.DataFrame(chunk)
table = pa.Table.from_pandas(chunk_df)
pq.write_to_dataset(table, root_path=str(parquet))
pbar.update(chunk_size)
except Exception as exc:
parquet.absolute().unlink()
raise exc

pbar.update(approx_final_size - pbar.n)
try:
chunk_size = 30_000
for chunk in stream_dbf(
DBF(path, encoding="iso-8859-1", raw=True), chunk_size
):
if _pbar:
_pbar.update(chunk_size)

chunk_df = pd.DataFrame(chunk)
table = pa.Table.from_pandas(chunk_df)
pq.write_to_dataset(table, root_path=str(parquet))
except Exception as exc:
parquet.absolute().unlink()
raise exc

if _pbar:
_pbar.update(approx_final_size - _pbar.n)

path.unlink()

Expand Down
14 changes: 7 additions & 7 deletions pysus/data/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ class ParquetSet:
__path__: Union[PurePosixPath, PureWindowsPath]
info: Dict

def __init__(self, path: str) -> None:
def __init__(self, path: str, _pbar = None) -> None:
info = {}
path = Path(path)

if path.suffix.lower() not in [".parquet", ".dbc", ".dbf"]:
raise NotImplementedError(f"Unknown file type: {path.suffix}")

if path.suffix.lower() == ".dbc":
path = Path(dbc_to_dbf(path))
path = Path(dbc_to_dbf(path, _pbar=_pbar))

if path.suffix.lower() == ".dbf":
path = Path(dbf_to_parquet(path))
path = Path(dbf_to_parquet(path, _pbar=_pbar))

if path.is_dir():
info["size"] = sum(
Expand All @@ -52,7 +52,7 @@ def path(self) -> str:


def parse_data_content(
path: Union[List[str], str]
path: Union[List[str], str], _pbar = None
) -> Union[ParquetSet, List[ParquetSet]]:
if isinstance(path, str):
path = [path]
Expand All @@ -67,7 +67,7 @@ def parse_data_content(
continue

if data_path.suffix.lower() in [".dbc", ".dbf", ".parquet"]:
content.append(ParquetSet(str(data_path)))
content.append(ParquetSet(str(data_path), _pbar=_pbar))
else:
continue

Expand All @@ -86,6 +86,6 @@ class Data:
"""

def __new__(
cls, path: Union[List[str], str]
cls, path: Union[List[str], str], _pbar = None
) -> Union[ParquetSet, List[ParquetSet]]:
return parse_data_content(path)
return parse_data_content(path, _pbar=_pbar)
41 changes: 34 additions & 7 deletions pysus/ftp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Dict, List, Optional, Self, Set, Union

import humanize
from tqdm import tqdm
from aioftp import Client
from loguru import logger

Expand Down Expand Up @@ -87,37 +88,60 @@ def info(self):
info["modify"] = self.__info__["modify"].strftime("%Y-%m-%d %I:%M%p")
return info

def download(self, local_dir: str = CACHEPATH) -> Data:
def download(self, local_dir: str = CACHEPATH, _pbar = None) -> Data:
_dir = pathlib.Path(local_dir)
_dir.mkdir(exist_ok=True, parents=True)
filepath = _dir / self.basename
filesize = int(self.__info__["size"])

if _pbar:
_pbar.unit = "B"
_pbar.unit_scale = True
_pbar.reset(total=filesize)

_parquet = filepath.with_suffix(".parquet")
if _parquet.exists():
return Data(str(_parquet))
if _pbar:
_pbar.update(filesize - _pbar.n)
return Data(str(_parquet), _pbar=_pbar)

_dbf = filepath.with_suffix(".dbf")
if _dbf.exists():
return Data(str(_dbf))
if _pbar:
_pbar.update(filesize - _pbar.n)
return Data(str(_dbf), _pbar=_pbar)

if filepath.exists():
return Data(str(filepath))
if _pbar:
_pbar.update(filesize - _pbar.n)
return Data(str(filepath), _pbar=_pbar)

if _pbar:
_pbar.set_description(f"{self.basename}")

try:
ftp = ftp = FTP("ftp.datasus.gov.br")
ftp.login()
output = open(f"{filepath}", "wb")

def callback(data):
output.write(data)
if _pbar:
_pbar.update(len(data))

ftp.retrbinary(
f"RETR {self.path}",
output.write,
callback,
)
except Exception as exc:
raise exc
finally:
ftp.close()
output.close()

return Data(str(filepath))
if _pbar:
_pbar.update(filesize - _pbar.n)
return Data(str(filepath), _pbar=_pbar)

async def async_download(self, local_dir: str = CACHEPATH) -> Data:
# aioftp.Client.parse_list_line_custom
Expand Down Expand Up @@ -490,10 +514,13 @@ def download(
"""
Downloads a list of Files.
"""
files = to_list(files)
pbar = tqdm(total=len(files), dynamic_ncols=True)
dfiles = []
for file in files:
if isinstance(file, File):
dfiles.append(file.download(local_dir=local_dir))
dfiles.append(file.download(local_dir=local_dir, _pbar=pbar))
pbar.close()
return dfiles

async def async_download(
Expand Down

0 comments on commit 8cd691c

Please sign in to comment.