From ac2983b156e05ac620cb917f3245134be9859aa2 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sat, 14 Sep 2024 23:46:25 +0200 Subject: [PATCH 1/9] STY: Minor code-style improvements for _reader.py --- pypdf/_reader.py | 125 ++++++++++++++++++++++++++--------------------- 1 file changed, 70 insertions(+), 55 deletions(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 58c160302..4df360c97 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -123,9 +123,19 @@ def __init__( self.xref_objStm: Dict[int, Tuple[Any, Any]] = {} self.trailer = DictionaryObject() - self._page_id2num: Optional[ - Dict[Any, Any] - ] = None # map page indirect_reference number to Page Number + # map page indirect_reference number to page number + self._page_id2num: Optional[Dict[Any, Any]] = None + + self._initialize_stream(stream) + + self._override_encryption = False + self._encryption: Optional[Encryption] = None + if self.is_encrypted: + self._handle_encryption(password) + elif password is not None: + raise PdfReadError("Not an encrypted file") + + def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None: if hasattr(stream, "mode") and "b" not in stream.mode: logger_warning( "PdfReader stream/file object is not in binary mode. " @@ -141,31 +151,25 @@ def __init__( self.read(stream) self.stream = stream + def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None: + self._override_encryption = True + # Some documents may not have a /ID, use two empty + # byte strings instead. Solves + # https://github.com/py-pdf/pypdf/issues/608 + id_entry = self.trailer.get(TK.ID) + id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" + encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object()) + self._encryption = Encryption.read(encrypt_entry, id1_entry) + + # try empty password if no password provided + pwd = password if password is not None else b"" + if ( + self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED + and password is not None + ): + # raise if password provided + raise WrongPasswordError("Wrong password") self._override_encryption = False - self._encryption: Optional[Encryption] = None - if self.is_encrypted: - self._override_encryption = True - # Some documents may not have a /ID, use two empty - # byte strings instead. Solves - # https://github.com/py-pdf/pypdf/issues/608 - id_entry = self.trailer.get(TK.ID) - id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" - encrypt_entry = cast( - DictionaryObject, self.trailer[TK.ENCRYPT].get_object() - ) - self._encryption = Encryption.read(encrypt_entry, id1_entry) - - # try empty password if no password provided - pwd = password if password is not None else b"" - if ( - self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED - and password is not None - ): - # raise if password provided - raise WrongPasswordError("Wrong password") - self._override_encryption = False - elif password is not None: - raise PdfReadError("Not encrypted file") def __enter__(self) -> "PdfReader": return self @@ -285,28 +289,29 @@ def _get_page_number_by_indirect( self, indirect_reference: Union[None, int, NullObject, IndirectObject] ) -> Optional[int]: """ - Generate _page_id2num. + Retrieve the page number from an indirect reference. Args: - indirect_reference: + indirect_reference: The indirect reference to locate. Returns: - The page number or None + Page number or None. """ if self._page_id2num is None: self._page_id2num = { - x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore + x.indirect_reference.idnum: i + for i, x in enumerate(self.pages) + if x.indirect_reference is not None } if indirect_reference is None or isinstance(indirect_reference, NullObject): return None - if isinstance(indirect_reference, int): - idnum = indirect_reference - else: - idnum = indirect_reference.idnum - assert self._page_id2num is not None, "hint for mypy" - ret = self._page_id2num.get(idnum, None) - return ret + idnum = ( + indirect_reference + if isinstance(indirect_reference, int) + else indirect_reference.idnum + ) + return self._page_id2num.get(idnum) def _get_object_from_stream( self, indirect_reference: IndirectObject @@ -560,6 +565,12 @@ def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject return obj def read(self, stream: StreamType) -> None: + """ + Read and process the PDF stream, extracting necessary data. + + Args: + stream (StreamType): The PDF file stream. + """ self._basic_validation(stream) self._find_eof_marker(stream) startxref = self._find_startxref_pos(stream) @@ -619,7 +630,7 @@ def read(self, stream: StreamType) -> None: stream.seek(loc, 0) # return to where it was def _basic_validation(self, stream: StreamType) -> None: - """Ensure file is not empty. Read at most 5 bytes.""" + """Ensure the stream is valid and not empty.""" stream.seek(0, os.SEEK_SET) try: header_byte = stream.read(5) @@ -801,6 +812,7 @@ def _read_standard_xref_table(self, stream: StreamType) -> None: def _read_xref_tables_and_trailers( self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int ) -> None: + """Read the cross-reference tables and trailers in the PDF stream.""" self.xref = {} self.xref_free_entry = {} self.xref_objStm = {} @@ -825,21 +837,12 @@ def _read_xref_tables_and_trailers( except Exception as e: if TK.ROOT in self.trailer: logger_warning( - f"Previous trailer can not be read {e.args}", - __name__, + f"Previous trailer cannot be read: {e.args}", __name__ ) break else: - raise PdfReadError(f"trailer can not be read {e.args}") - trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE - for key in trailer_keys: - if key in xrefstream and key not in self.trailer: - self.trailer[NameObject(key)] = xrefstream.raw_get(key) - if "/XRefStm" in xrefstream: - p = stream.tell() - stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) - self._read_pdf15_xref_stream(stream) - stream.seek(p, 0) + raise PdfReadError(f"Trailer cannot be read: {e.args}") + self._process_xref_stream(xrefstream) if "/Prev" in xrefstream: startxref = cast(int, xrefstream["/Prev"]) else: @@ -847,6 +850,18 @@ def _read_xref_tables_and_trailers( else: startxref = self._read_xref_other_error(stream, startxref) + def _process_xref_stream(self, xrefstream: DictionaryObject) -> None: + """Process and handle the xref stream.""" + trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE + for key in trailer_keys: + if key in xrefstream and key not in self.trailer: + self.trailer[NameObject(key)] = xrefstream.raw_get(key) + if "/XRefStm" in xrefstream: + p = self.stream.tell() + self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) + self._read_pdf15_xref_stream(self.stream) + self.stream.seek(p, 0) + def _read_xref(self, stream: StreamType) -> Optional[int]: self._read_standard_xref_table(stream) if stream.read(1) == b"": @@ -919,7 +934,7 @@ def _read_xref_other_error( def _read_pdf15_xref_stream( self, stream: StreamType ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]: - # PDF 1.5+ Cross-Reference Stream + """Read the cross-reference stream for PDF 1.5+.""" stream.seek(-1, 1) idnum, generation = self.read_object_header(stream) xrefstream = cast(ContentStream, read_object(stream, self)) @@ -1047,6 +1062,7 @@ def _read_xref_subsections( get_entry: Callable[[int], Union[int, Tuple[int, ...]]], used_before: Callable[[int, Union[int, Tuple[int, ...]]], bool], ) -> None: + """Read and process the subsections of the xref.""" for start, size in self._pairs(idx_pairs): # The subsections must increase for num in range(start, start + size): @@ -1076,12 +1092,11 @@ def _read_xref_subsections( raise PdfReadError(f"Unknown xref type: {xref_type}") def _pairs(self, array: List[int]) -> Iterable[Tuple[int, int]]: + """Iterate over pairs in the array.""" i = 0 - while True: + while i + 1 < len(array): yield array[i], array[i + 1] i += 2 - if (i + 1) >= len(array): - break def decrypt(self, password: Union[str, bytes]) -> PasswordType: """ From dfa3d5ca8b672db162419652e79ddf83d7d48289 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 06:07:07 +0200 Subject: [PATCH 2/9] Fix tests --- pypdf/_reader.py | 1 + tests/test_encryption.py | 2 +- tests/test_reader.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 5e841ee3d..e5e1ff3ca 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -307,6 +307,7 @@ def _get_page_number_by_indirect( if is_null_or_none(indirect_reference): return None + assert isinstance(indirect_reference, (int, IndirectObject)), "mypy" idnum = ( indirect_reference if isinstance(indirect_reference, int) diff --git a/tests/test_encryption.py b/tests/test_encryption.py index f5c494cb9..be92e40a9 100644 --- a/tests/test_encryption.py +++ b/tests/test_encryption.py @@ -205,7 +205,7 @@ def test_attempt_decrypt_unencrypted_pdf(): path = RESOURCE_ROOT / "crazyones.pdf" with pytest.raises(PdfReadError) as exc: PdfReader(path, password="nonexistent") - assert exc.value.args[0] == "Not encrypted file" + assert exc.value.args[0] == "Not an encrypted file" @pytest.mark.skipif(not HAS_AES, reason="No AES implementation") diff --git a/tests/test_reader.py b/tests/test_reader.py index 99555cd22..e495ebbc3 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1293,7 +1293,7 @@ def test_reader(caplog): url = "https://github.com/py-pdf/pypdf/files/9464742/shiv_resume.pdf" name = "shiv_resume.pdf" reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) - assert "Previous trailer can not be read" in caplog.text + assert "Previous trailer cannot be read" in caplog.text caplog.clear() # first call requires some reparations... reader.pages[0].extract_text() From 6253b4b4cac1c76f0e00797611cfb9ab96e4b1e1 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 14:17:59 +0200 Subject: [PATCH 3/9] Update pypdf/_reader.py Co-authored-by: pubpub-zz <4083478+pubpub-zz@users.noreply.github.com> --- pypdf/_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index e5e1ff3ca..5c00b5a6a 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -302,7 +302,7 @@ def _get_page_number_by_indirect( self._page_id2num = { x.indirect_reference.idnum: i for i, x in enumerate(self.pages) - if x.indirect_reference is not None + if is_null_or_none(x.indirect_reference) } if is_null_or_none(indirect_reference): From bc3ae82e8bcba67a88466f420cfb4b576f2cebd1 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 14:27:00 +0200 Subject: [PATCH 4/9] fix doc building warning --- pypdf/_reader.py | 4 ++-- pypdf/generic/_base.py | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 5c00b5a6a..c0564f4c2 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -300,7 +300,7 @@ def _get_page_number_by_indirect( """ if self._page_id2num is None: self._page_id2num = { - x.indirect_reference.idnum: i + x.indirect_reference.idnum: i # type: ignore for i, x in enumerate(self.pages) if is_null_or_none(x.indirect_reference) } @@ -571,7 +571,7 @@ def read(self, stream: StreamType) -> None: Read and process the PDF stream, extracting necessary data. Args: - stream (StreamType): The PDF file stream. + stream: The PDF file stream. """ self._basic_validation(stream) self._find_eof_marker(stream) diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index fd7d1a8ff..c2dd73668 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -28,11 +28,17 @@ import codecs import hashlib import re +import sys from binascii import unhexlify from math import log10 from struct import iter_unpack from typing import Any, Callable, ClassVar, Dict, Optional, Sequence, Union, cast +if sys.version_info[:2] >= (3, 10): + from typing import TypeGuard +else: + from typing_extensions import TypeGuard # PEP 647 + from .._codecs import _pdfdoc_encoding_rev from .._protocols import PdfObjectProtocol, PdfWriterProtocol from .._utils import ( @@ -214,7 +220,7 @@ def __repr__(self) -> str: return "NullObject" -def is_null_or_none(x: Any) -> bool: +def is_null_or_none(x: Any) -> TypeGuard[None]: """ Returns: True if x is None or NullObject. From 7a4409f540b07050380398dcd03747d275925ad2 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 20:41:30 +0200 Subject: [PATCH 5/9] Undo is_null_or_none --- pypdf/_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index c0564f4c2..f9d23fd87 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -302,7 +302,7 @@ def _get_page_number_by_indirect( self._page_id2num = { x.indirect_reference.idnum: i # type: ignore for i, x in enumerate(self.pages) - if is_null_or_none(x.indirect_reference) + if x.indirect_reference is None } if is_null_or_none(indirect_reference): From dd68fa1c9a9b64db7c6125f26adcc495303bfb79 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 21:18:45 +0200 Subject: [PATCH 6/9] Undo --- pypdf/_reader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index f9d23fd87..4b40ce0e7 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -300,9 +300,7 @@ def _get_page_number_by_indirect( """ if self._page_id2num is None: self._page_id2num = { - x.indirect_reference.idnum: i # type: ignore - for i, x in enumerate(self.pages) - if x.indirect_reference is None + x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore } if is_null_or_none(indirect_reference): From 7510d5404115ed2b75597f70fd5e57fb6969feff Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Sun, 15 Sep 2024 21:19:56 +0200 Subject: [PATCH 7/9] Undo --- pypdf/_reader.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 4b40ce0e7..def0fc7f1 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -306,12 +306,13 @@ def _get_page_number_by_indirect( if is_null_or_none(indirect_reference): return None assert isinstance(indirect_reference, (int, IndirectObject)), "mypy" - idnum = ( - indirect_reference - if isinstance(indirect_reference, int) - else indirect_reference.idnum - ) - return self._page_id2num.get(idnum) + if isinstance(indirect_reference, int): + idnum = indirect_reference + else: + idnum = indirect_reference.idnum + assert self._page_id2num is not None, "hint for mypy" + ret = self._page_id2num.get(idnum, None) + return ret def _get_object_from_stream( self, indirect_reference: IndirectObject From af2c05f0e1c0feafacc57cc5b3bcdf8a13147eeb Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Thu, 19 Sep 2024 22:03:23 +0200 Subject: [PATCH 8/9] TypeGuard refinement for is_null_or_none Co-authored-by: pubpub-zz <4083478+pubpub-zz@users.noreply.github.com> --- pypdf/generic/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index c2dd73668..4eb059e75 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -220,7 +220,7 @@ def __repr__(self) -> str: return "NullObject" -def is_null_or_none(x: Any) -> TypeGuard[None]: +def is_null_or_none(x: Any) -> TypeGuard[None|NullObject|IndirectObject]: """ Returns: True if x is None or NullObject. From b10514fc48d5dfd4ce8ff8496890a7be1b1f1e91 Mon Sep 17 00:00:00 2001 From: Martin Thoma Date: Fri, 20 Sep 2024 07:58:35 +0200 Subject: [PATCH 9/9] Move function to bottom for type annotations --- pypdf/generic/_base.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index 4eb059e75..e05e00b39 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -220,16 +220,6 @@ def __repr__(self) -> str: return "NullObject" -def is_null_or_none(x: Any) -> TypeGuard[None|NullObject|IndirectObject]: - """ - Returns: - True if x is None or NullObject. - """ - return x is None or ( - isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject) - ) - - class BooleanObject(PdfObject): def __init__(self, value: Any) -> None: self.value = value @@ -859,3 +849,13 @@ def encode_pdfdocencoding(unicode_string: str) -> bytes: -1, "does not exist in translation table", ) + + +def is_null_or_none(x: Any) -> TypeGuard[Union[None, NullObject, IndirectObject]]: + """ + Returns: + True if x is None or NullObject. + """ + return x is None or ( + isinstance(x, PdfObject) and isinstance(x.get_object(), NullObject) + )