Skip to content

Commit

Permalink
Merge pull request #225 from elfkuzco/fix/use-pathlib
Browse files Browse the repository at this point in the history
Replace usage of os.path and path.py with pathlib
  • Loading branch information
benoit74 committed Apr 30, 2024
2 parents a30026e + 3a8250c commit f344086
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 246 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ as of 2.0.0.
### Changed

- Insert as few rsync URLs as possible in DB when a book selection is made (#220)
- Replace usage of os.path and path.py with pathlib.Path (#195)

### Fixed

Expand Down Expand Up @@ -102,7 +103,7 @@ as of 2.0.0.
## [1.1.6]

- removed duplicate dependencies
- Added tag _category:gutenberg which was missing
- Added tag \_category:gutenberg which was missing
- docker-only release with updated zimwriterfs (2.1.0-1)

## [1.1.5]
Expand Down
2 changes: 1 addition & 1 deletion src/gutenberg2zim/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
logger = getLogger(NAME, level=logging.INFO)

TMP_FOLDER = "tmp"
TMP_FOLDER_PATH = pathlib.Path(TMP_FOLDER)
TMP_FOLDER_PATH = pathlib.Path(TMP_FOLDER).resolve()
2 changes: 1 addition & 1 deletion src/gutenberg2zim/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def load_fixtures(model):
logger.debug(f"[fixtures] Created {f}")


def setup_database(*, wipe=False):
def setup_database(*, wipe: bool = False) -> None:
logger.info("Setting up the database")

for model in (License, Author, Book, BookFormat, Url):
Expand Down
94 changes: 48 additions & 46 deletions src/gutenberg2zim/download.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import os
import pathlib
import shutil
import tempfile
import zipfile
from multiprocessing.dummy import Pool
from pathlib import Path
from pprint import pprint as pp

import apsw
import backoff
from kiwixstorage import KiwixStorage
from path import Path

from gutenberg2zim.constants import TMP_FOLDER, logger
from gutenberg2zim.constants import TMP_FOLDER_PATH, logger
from gutenberg2zim.database import Book, BookFormat
from gutenberg2zim.export import fname_for, get_list_of_filtered_books
from gutenberg2zim.s3 import download_from_cache
Expand All @@ -36,24 +34,24 @@
# return False


def handle_zipped_epub(zippath, book, dst_dir: pathlib.Path):
def handle_zipped_epub(zippath: Path, book: Book, dst_dir: Path) -> bool:
def clfn(fn):
return os.path.join(*os.path.split(fn)[1:])
return Path(fn).name

def is_safe(fname):
fname = ensure_unicode(clfn(fname))
if Path(fname).basename() == fname:
name = ensure_unicode(clfn(fname))
if Path(fname).name == name:
return True
return fname == os.path.join("images", Path(fname).splitpath()[-1])
return fname == f"images/{Path(fname).name}"

zipped_files = []
# create temp directory to extract to
tmpd = tempfile.mkdtemp(dir=TMP_FOLDER)
tmpd = tempfile.mkdtemp(dir=TMP_FOLDER_PATH)
try:
with zipfile.ZipFile(zippath, "r") as zf:
# check that there is no insecure data (absolute names)
if sum([1 for n in zf.namelist() if not is_safe(ensure_unicode(n))]):
Path(tmpd).rmtree_p()
shutil.rmtree(tmpd, ignore_errors=True)
return False
# zipped_files = [clfn(fn) for fn in zf.namelist()]
zipped_files = zf.namelist()
Expand All @@ -64,7 +62,7 @@ def is_safe(fname):
# file is not a zip file when it should be.
# don't process it anymore as we don't know what to do.
# could this be due to an incorrect/incomplete download?
return
return False

# is there multiple HTML files in ZIP ? (rare)
mhtml = (
Expand All @@ -73,25 +71,26 @@ def is_safe(fname):
# move all extracted files to proper locations
for zipped_file in zipped_files:
# skip folders
if not Path(zipped_file).ext:
if not Path(zipped_file).is_file():
continue

src = os.path.join(tmpd, zipped_file)
if os.path.exists(src):
fname = Path(zipped_file).basename()
src = Path(tmpd) / zipped_file
if src.exists():
fname = Path(zipped_file).name

if fname.endswith(".html") or fname.endswith(".htm"):
if mhtml:
if fname.startswith(f"{book.id}-h."):
dst = dst_dir.joinpath(f"{book.id}.html")
dst = dst_dir / f"{book.id}.html"
else:
dst = dst_dir.joinpath(f"{book.id}_{fname}")
dst = dst_dir / f"{book.id}_{fname}"
else:
dst = dst_dir.joinpath(f"{book.id}.html")
dst = dst_dir / f"{book.id}.html"
else:
dst = dst_dir.joinpath(f"{book.id}_{fname}")
dst = dst_dir / f"{book.id}_{fname}"
dst = dst.resolve()
try:
Path(src).move(str(dst))
src.rename(dst)
except Exception as e:
import traceback

Expand All @@ -100,14 +99,14 @@ def is_safe(fname):
raise

# delete temp directory and zipfile
if Path(zippath).exists():
os.unlink(zippath)
Path(tmpd).rmtree_p()
zippath.unlink(missing_ok=True)
shutil.rmtree(tmpd, ignore_errors=True)
return True


def download_book(
book: Book,
download_cache: str,
download_cache: Path,
formats: list[str],
*,
force: bool,
Expand All @@ -124,13 +123,15 @@ def download_book(
if "html" not in formats:
formats.append("html")

book_dir = pathlib.Path(download_cache).joinpath(str(book.id))
optimized_dir = book_dir.joinpath("optimized")
unoptimized_dir = book_dir.joinpath("unoptimized")
book_dir = download_cache / str(book.id)
optimized_dir = book_dir / "optimized"
unoptimized_dir = book_dir / "unoptimized"

unsuccessful_formats = []
for book_format in formats:
unoptimized_fpath = unoptimized_dir.joinpath(fname_for(book, book_format))
optimized_fpath = optimized_dir.joinpath(archive_name_for(book, book_format))
unoptimized_fpath = unoptimized_dir / fname_for(book, book_format)
unoptimized_fpath = unoptimized_dir / fname_for(book, book_format)
optimized_fpath = optimized_dir / archive_name_for(book, book_format)

# check if already downloaded
if (unoptimized_fpath.exists() or optimized_fpath.exists()) and not force:
Expand All @@ -141,12 +142,10 @@ def download_book(
if book_format == "html":
for fpath in book_dir.iterdir():
if fpath.is_file() and fpath.suffix not in [".pdf", ".epub"]:
fpath.unlink()
fpath.unlink(missing_ok=True)
else:
if unoptimized_fpath.exists():
unoptimized_fpath.unlink()
if optimized_fpath.exists():
optimized_fpath.unlink()
unoptimized_fpath.unlink(missing_ok=True)
optimized_fpath.unlink(missing_ok=True)
# delete dirs which are empty
for dir_name in [optimized_dir, unoptimized_dir]:
if not dir_name.exists():
Expand Down Expand Up @@ -233,7 +232,7 @@ def download_book(

# HTML files are *sometime* available as ZIP files
if url.endswith(".zip"):
zpath = unoptimized_dir.joinpath(f"{fname_for(book, book_format)}.zip")
zpath = unoptimized_dir / f"{fname_for(book, book_format)}.zip"

etag = get_etag_from_url(url)
if s3_storage:
Expand All @@ -254,7 +253,11 @@ def download_book(
book.html_etag = etag # type: ignore
book.save()
# extract zipfile
handle_zipped_epub(zippath=zpath, book=book, dst_dir=unoptimized_dir)
handle_zipped_epub(
zippath=zpath,
book=book,
dst_dir=unoptimized_dir,
)
else:
if (
url.endswith(".htm")
Expand Down Expand Up @@ -329,10 +332,9 @@ def download_cover(book, book_dir, s3_storage, optimizer_version):
etag = get_etag_from_url(url)
downloaded_from_cache = False
cover = f"{book.id}_cover_image.jpg"
if (
book_dir.joinpath("optimized").joinpath(cover).exists()
or book_dir.joinpath("unoptimized").joinpath(cover).exists()
):
if (book_dir / "optimized" / cover).exists() or (
book_dir / "unoptimized" / cover
).exists():
logger.debug(f"Cover already exists for book #{book.id}")
return
if s3_storage:
Expand All @@ -343,25 +345,25 @@ def download_cover(book, book_dir, s3_storage, optimizer_version):
book=book,
etag=etag,
book_format="cover",
dest_dir=book_dir.joinpath("optimized"),
dest_dir=book_dir / "optimized",
s3_storage=s3_storage,
optimizer_version=optimizer_version,
)
if not downloaded_from_cache:
logger.debug(f"Downloading {url}")
if download_file(url, book_dir.joinpath("unoptimized").joinpath(cover)):
if download_file(url, book_dir / "unoptimized" / cover):
book.cover_etag = etag
book.save()
else:
logger.debug(f"No Book Cover found for Book #{book.id}")


def download_all_books(
download_cache: str,
download_cache: Path,
concurrency: int,
languages: list[str],
formats: list[str],
only_books: list[str],
only_books: list[int],
*,
force: bool,
s3_storage: KiwixStorage | None,
Expand All @@ -372,7 +374,7 @@ def download_all_books(
)

# ensure dir exist
Path(download_cache).mkdir_p()
download_cache.mkdir(parents=True, exist_ok=True)

def backoff_busy_error_hdlr(details):
logger.warning(
Expand Down
24 changes: 15 additions & 9 deletions src/gutenberg2zim/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import os
import sys
from pathlib import Path

from docopt import docopt
from path import Path

from gutenberg2zim.checkdeps import check_dependencies
from gutenberg2zim.constants import TMP_FOLDER_PATH, VERSION, logger
Expand Down Expand Up @@ -94,7 +93,12 @@ def main():
arguments.get("--rdf-url")
or "http://www.gutenberg.org/cache/epub/feeds/rdf-files.tar.bz2"
)
dl_cache = arguments.get("--dl-folder") or os.path.join("dl-cache")

if dl_folder := arguments.get("--dl-folder"):
dl_cache = Path(dl_folder).resolve()
else:
dl_cache = Path("dl-cache").resolve()

books_csv = arguments.get("--books") or ""
zim_title = arguments.get("--zim-title")
zim_desc = arguments.get("--zim-desc")
Expand Down Expand Up @@ -141,7 +145,7 @@ def main():
}
)

books = []
books: list[int] = []
try:
books_csv = books_csv.split(",")

Expand All @@ -151,7 +155,7 @@ def f(x):
for i in books_csv:
blst = f(i)
if len(blst) > 1:
blst = range(blst[0], blst[1] + 1)
blst = list(range(blst[0], blst[1] + 1))
books.extend(blst)
books_csv = list(set(books))
except Exception as e:
Expand Down Expand Up @@ -219,20 +223,22 @@ def f(x):
if do_zim:
logger.info("BUILDING ZIM dynamically")
build_zimfile(
output_folder=Path(one_lang_one_zim_folder or ".").abspath(),
output_folder=Path(one_lang_one_zim_folder).resolve()
if one_lang_one_zim_folder
else Path(".").resolve(),
download_cache=dl_cache,
concurrency=concurrency,
languages=zim_lang,
formats=formats,
only_books=books,
force=force,
title_search=title_search,
add_bookshelves=bookshelves,
s3_storage=s3_storage,
optimizer_version=optimizer_version,
zim_name=Path(zim_name).name if zim_name else None,
title=zim_title,
description=zim_desc,
stats_filename=stats_filename,
publisher=publisher,
force=force,
title_search=title_search,
add_bookshelves=bookshelves,
)
Loading

0 comments on commit f344086

Please sign in to comment.