Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: load with constructor, use piping instead of temp_files #72

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 24 additions & 39 deletions hotpdf/hotpdf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import math
import os
import tempfile
import xml.etree.ElementTree as ET
from collections import defaultdict
from typing import Optional, Union

Expand All @@ -16,20 +14,33 @@ class HotPdf:
def __init__(
self,
extraction_tolerance: int = 4,
source: Union[str, bytes, None] = None,
callegarimattia marked this conversation as resolved.
Show resolved Hide resolved
password: str = "",
drop_duplicate_spans: bool = True,
first_page: int = 0,
last_page: int = 0,
) -> None:
"""Initialize the HotPdf class.

Args:
extraction_tolerance (int, optional): Tolerance value used during text extraction
to adjust the bounding box for capturing text. Defaults to 4.
pdf_file (str | bytes, optional): The path to the PDF file to be loaded, or a bytes object.
password (str, optional): Password to use to unlock the pdf
drop_duplicate_spans (bool, optional): Drop duplicate spans when loading. Defaults to True.
first_page (int, optional): The first page to load. Defaults to 0.
last_page (int, optional): The last page to load. Defaults to 0.

Raises:
ValueError: If the page range is invalid.
FileNotFoundError: If the file is not found.
PermissionError: If the file is encrypted or the password is wrong.
RuntimeError: If an unkown error is generated by Ghostscript.
"""
self.pages: list[MemoryMap] = []
self.extraction_tolerance: int = extraction_tolerance
self.xml_file_path: Optional[str] = None

def __delete_file(self, path: Union[str, None]) -> None:
if path and os.path.exists(path):
os.remove(path)
if source:
self.load(source, password, drop_duplicate_spans, first_page, last_page)

def __check_file_exists(self, pdf_file: str) -> None:
if not os.path.exists(pdf_file):
Expand All @@ -47,14 +58,14 @@ def __check_page_range(self, first_page: int, last_page: int) -> None:
if first_page > last_page or first_page < 0 or last_page < 0:
raise ValueError("Invalid page range")

def __prechecks(self, pdf_file: Union[str, bytes], first_page: int, last_page: int) -> None:
if isinstance(pdf_file, str):
self.__check_file_exists(pdf_file)
def __prechecks(self, source: Union[str, bytes], first_page: int, last_page: int) -> None:
if type(source) is str:
self.__check_file_exists(source)
self.__check_page_range(first_page, last_page)

def load(
self,
pdf_file: Union[str, bytes],
source: Union[str, bytes],
callegarimattia marked this conversation as resolved.
Show resolved Hide resolved
password: str = "",
drop_duplicate_spans: bool = True,
first_page: int = 0,
Expand All @@ -75,34 +86,8 @@ def load(
PermissionError: If the file is encrypted or the password is wrong.
RuntimeError: If an unkown error is generated by Ghostscript.
"""
self.__prechecks(pdf_file, first_page, last_page)
_bytes_file_received: bool = False
if isinstance(pdf_file, bytes):
_temp_pdf_file = tempfile.NamedTemporaryFile(delete=False)
_temp_pdf_file.write(pdf_file)
_temp_pdf_file_name = _temp_pdf_file.name
_bytes_file_received = True
else:
_temp_pdf_file_name = pdf_file
self.xml_file_path = processor.generate_xml_file(_temp_pdf_file_name, password, first_page, last_page)
tree_iterator = ET.iterparse(self.xml_file_path, events=("start", "end"))
event: str
root: ET.Element
event, root = next(tree_iterator)

element: ET.Element
for event, element in tree_iterator:
if event == "end" and element.tag == "page":
parsed_page: MemoryMap = MemoryMap()
parsed_page.build_memory_map()
parsed_page.load_memory_map(page=element, drop_duplicate_spans=drop_duplicate_spans)
self.pages.append(parsed_page)
element.clear()
root.clear()

self.__delete_file(self.xml_file_path)
if _bytes_file_received:
self.__delete_file(_temp_pdf_file_name)
self.__prechecks(source, first_page, last_page)
self.pages = processor.process(source, password, drop_duplicate_spans, first_page, last_page)

def __extract_full_text_span(
self,
Expand Down
80 changes: 58 additions & 22 deletions hotpdf/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import re
import subprocess
import tempfile
import xml.etree.ElementTree as ET
callegarimattia marked this conversation as resolved.
Show resolved Hide resolved
from enum import Enum
from pathlib import Path
from typing import Union

from hotpdf.helpers import nanoid
from hotpdf.memory_map import MemoryMap


class Result(Enum):
Expand All @@ -14,7 +20,15 @@ class Result(Enum):
UNKNOWN_ERROR = 3


def generate_xml_file(file_path: str, password: str, first_page: int, last_page: int) -> str:
def process(
source: Union[str, bytes], password: str, drop_duplicate_spans: bool, first_page: int, last_page: int
) -> list[MemoryMap]:
xml_file = __generate_xml_file(source, password, first_page, last_page)
pages = __parse_xml(xml_file, drop_duplicate_spans)
return pages


def __generate_xml_file(source: Union[str, bytes], password: str, first_page: int, last_page: int) -> Path:
callegarimattia marked this conversation as resolved.
Show resolved Hide resolved
"""Generate XML notation of PDF File.

Args:
Expand All @@ -28,19 +42,14 @@ def generate_xml_file(file_path: str, password: str, first_page: int, last_page:
Returns:
str: XML File Path.
"""
temp_xml_file_name = tempfile.NamedTemporaryFile(delete=False).name

result = __call_ghostscript(file_path, temp_xml_file_name, password, first_page, last_page)

temp_xml_file_path = Path(tempfile.gettempdir(), nanoid.generate_nano_id() + ".xml")
result = __call_ghostscript(source, temp_xml_file_path, password, first_page, last_page)
__handle_gs_result(result)

__clean_xml(temp_xml_file_name)

return temp_xml_file_name
return __clean_xml(temp_xml_file_path)


def __call_ghostscript(
file_path: str, temp_xml_file_name: str, password: str, first_page: int, last_page: int
source: Union[str, bytes], temp_xml_file_path: Path, password: str, first_page: int, last_page: int
) -> Result:
ghostscript = "gs" if os.name != "nt" else "gswin64c"
command_line_args = [ghostscript, "-dNOPAUSE", "-dBATCH", "-dSAFER", "-dTextFormat=1", "-sDEVICE=txtwrite"]
Expand All @@ -52,13 +61,19 @@ def __call_ghostscript(
if last_page:
command_line_args.append(f"-dLastPage={last_page}")

command_line_args.extend([f'-sOutputFile="{temp_xml_file_name}"', f'"{file_path}"'])
command_line_args.append(f'-sOutputFile="{temp_xml_file_path}"')

# Uses gs in pipe mode
pipe = type(source) is bytes
if pipe:
command_line_args.append("-")
else:
command_line_args.append(str(source))
callegarimattia marked this conversation as resolved.
Show resolved Hide resolved

gs_call = " ".join(command_line_args)

try:
output = subprocess.check_output(gs_call, shell=ghostscript == "gs", stderr=subprocess.STDOUT).decode(
errors="ignore"
)
output = subprocess.run(gs_call, shell=ghostscript == "gs", input=source if pipe else None, capture_output=True)
status = __validate_gs_output(output)
except subprocess.CalledProcessError as err:
status = Result.UNKNOWN_ERROR
Expand All @@ -67,15 +82,15 @@ def __call_ghostscript(
return status


def __clean_xml(temporary_xml_file_name: str) -> None:
def __clean_xml(temporary_xml_path: Path) -> Path:
"""
Clean the raw xlm file generated by ghostscript.
Apply changes directly to the temporaryfile.

Args:
temporary_xml_file_name (str): the temporary file outputted by ghostscript
"""
with open(temporary_xml_file_name, "r+", encoding="utf-8") as f:
with open(temporary_xml_path, "r+", encoding="utf-8") as f:
raw_xml = f.read()
raw_xml = re.sub(r"(&#x[0-9]+;)", "", raw_xml)
raw_xml = re.sub(r"(&quot;)", "'", raw_xml)
Expand All @@ -89,16 +104,18 @@ def __clean_xml(temporary_xml_file_name: str) -> None:
f.seek(0)
f.write(raw_xml)
f.truncate()
return temporary_xml_path


def __validate_gs_output(output: str) -> Result:
if "This file requires a password for access" in output:
def __validate_gs_output(output: subprocess.CompletedProcess[bytes]) -> Result:
if output.returncode != 0:
return Result.UNKNOWN_ERROR
err = output.stderr.decode(errors="ignore")
if "This file requires a password for access" in err:
return Result.LOCKED
if "Password did not work" in output:
if "Password did not work" in err:
return Result.WRONG_PASSWORD
if "Page" in output:
return Result.LOADED
return Result.UNKNOWN_ERROR
return Result.LOADED


def __handle_gs_result(status: Result) -> None:
Expand All @@ -117,3 +134,22 @@ def __handle_gs_result(status: Result) -> None:
if status == Result.UNKNOWN_ERROR:
logging.error("GS: UNKNOWN ERROR")
raise RuntimeError("Unknown error in processing")


def __parse_xml(xml_file_path: Path, drop_duplicate_spans: bool) -> list[MemoryMap]:
pages: list[MemoryMap] = []
tree_iterator = ET.iterparse(xml_file_path, events=("start", "end"))
event: str
root: ET.Element
event, root = next(tree_iterator)
element: ET.Element
for event, element in tree_iterator:
if event == "end" and element.tag == "page":
parsed_page: MemoryMap = MemoryMap()
parsed_page.build_memory_map()
parsed_page.load_memory_map(page=element, drop_duplicate_spans=drop_duplicate_spans)
pages.append(parsed_page)
element.clear()
root.clear()
os.remove(xml_file_path)
return pages
8 changes: 4 additions & 4 deletions tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ def test_duplicate_spans_not_removed(mock_hotpdf_bank_file_name):
hot_pdf_object = HotPdf()
hot_pdf_object_with_dup_span = HotPdf()
with patch(
"hotpdf.processor.generate_xml_file",
"hotpdf.processor.__generate_xml_file",
return_value=xml_copy_file_name("tests/resources/xml/hotpdf_bank_dup_span.xml"),
):
hot_pdf_object_with_dup_span.load(mock_hotpdf_bank_file_name, drop_duplicate_spans=False)
with patch(
"hotpdf.processor.generate_xml_file",
"hotpdf.processor.__generate_xml_file",
return_value=xml_copy_file_name("tests/resources/xml/hotpdf_bank_dup_span.xml"),
):
hot_pdf_object.load(mock_hotpdf_bank_file_name)
Expand All @@ -114,15 +114,15 @@ def test_duplicate_spans_not_removed(mock_hotpdf_bank_file_name):
def test_load_negative_coordinates(mock_hotpdf_bank_file_name):
QUERY = "HOTPDF BANK"
with patch(
"hotpdf.processor.generate_xml_file",
"hotpdf.processor.__generate_xml_file",
return_value=xml_copy_file_name("tests/resources/xml/hotpdf_bank_negative_coords.xml"),
):
hot_pdf_object = HotPdf()
hot_pdf_object.load(mock_hotpdf_bank_file_name)
assert not hot_pdf_object.find_text(QUERY)[0], "Expected string to be empty"
# For sanity: The following file is same as above, except the coords are normal
with patch(
"hotpdf.processor.generate_xml_file",
"hotpdf.processor.__generate_xml_file",
return_value=xml_copy_file_name("tests/resources/xml/hotpdf_bank_normal_coords.xml"),
):
hot_pdf_object_normal = HotPdf()
Expand Down
Loading