From e58fd48bd14f31bebd2de8259f12630ac02756d6 Mon Sep 17 00:00:00 2001 From: Andrew Baumann Date: Sun, 5 Sep 2021 14:10:49 -0700 Subject: [PATCH] annotate ccitt.py, and fix one definite bug (array.tostring was renamed tobytes) --- mypy.ini | 6 +--- pdfminer/ccitt.py | 92 ++++++++++++++++++++++++++++------------------- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/mypy.ini b/mypy.ini index 4ae3e3b4..eaddd861 100644 --- a/mypy.ini +++ b/mypy.ini @@ -11,16 +11,12 @@ warn_return_any = True no_implicit_reexport = True strict_equality = True -# This seems impossible to leave on in a version-independent manner +# This seems impossible to turn on in a version-independent manner warn_unused_ignores = False [mypy-pdfminer.*] disallow_untyped_defs = True -[mypy-pdfminer.ccitt] -disallow_untyped_calls = False -disallow_untyped_defs = False - [mypy-cryptography.hazmat.*] ignore_missing_imports = True diff --git a/pdfminer/ccitt.py b/pdfminer/ccitt.py index e45e8252..d57f211b 100644 --- a/pdfminer/ccitt.py +++ b/pdfminer/ccitt.py @@ -13,24 +13,38 @@ import sys import array -from typing import Any, Dict +from typing import (Any, Callable, Dict, Iterator, List, MutableSequence, + Optional, Sequence, Union, cast) -def get_bytes(data): +def get_bytes(data: bytes) -> Iterator[int]: yield from data +# Workaround https://github.com/python/mypy/issues/731 +BitParserState = MutableSequence[Any] +# A better definition (not supported by mypy) would be: +# BitParserState = MutableSequence[Union["BitParserState", int, str, None]] + + class BitParser: - def __init__(self): + _state: BitParserState + + # _accept is declared Optional solely as a workaround for + # https://github.com/python/mypy/issues/708 + _accept: Optional[Callable[[Any], BitParserState]] + + def __init__(self) -> None: self._pos = 0 return @classmethod - def add(cls, root, v, bits): - p = root + def add(cls, root: BitParserState, v: Union[int, str], bits: str) -> None: + p: BitParserState = root b = None for i in range(len(bits)): if 0 < i: + assert b is not None if p[b] is None: p[b] = [None, None] p = p[b] @@ -38,16 +52,17 @@ def add(cls, root, v, bits): b = 1 else: b = 0 + assert b is not None p[b] = v return - def feedbytes(self, data): + def feedbytes(self, data: bytes) -> None: for byte in get_bytes(data): for m in (128, 64, 32, 16, 8, 4, 2, 1): self._parse_bit(byte & m) return - def _parse_bit(self, x): + def _parse_bit(self, x: Any) -> None: if x: v = self._state[1] else: @@ -56,6 +71,7 @@ def _parse_bit(self, x): if isinstance(v, list): self._state = v else: + assert self._accept is not None self._state = self._accept(v) return @@ -319,14 +335,16 @@ class InvalidData(Exception): class ByteSkip(Exception): pass - def __init__(self, width, bytealign=False): + _color: int + + def __init__(self, width: int, bytealign: bool = False): BitParser.__init__(self) self.width = width self.bytealign = bytealign self.reset() return - def feedbytes(self, data): + def feedbytes(self, data: bytes) -> None: for byte in get_bytes(data): try: for m in (128, 64, 32, 16, 8, 4, 2, 1): @@ -338,7 +356,7 @@ def feedbytes(self, data): break return - def _parse_mode(self, mode): + def _parse_mode(self, mode: Any) -> BitParserState: if mode == 'p': self._do_pass() self._flush_line() @@ -362,7 +380,7 @@ def _parse_mode(self, mode): else: raise self.InvalidData(mode) - def _parse_horiz1(self, n): + def _parse_horiz1(self, n: Any) -> BitParserState: if n is None: raise self.InvalidData self._n1 += n @@ -375,7 +393,7 @@ def _parse_horiz1(self, n): else: return self.BLACK - def _parse_horiz2(self, n): + def _parse_horiz2(self, n: Any) -> BitParserState: if n is None: raise self.InvalidData self._n2 += n @@ -390,7 +408,7 @@ def _parse_horiz2(self, n): else: return self.BLACK - def _parse_uncompressed(self, bits): + def _parse_uncompressed(self, bits: Optional[str]) -> BitParserState: if not bits: raise self.InvalidData if bits.startswith('T'): @@ -402,10 +420,10 @@ def _parse_uncompressed(self, bits): self._do_uncompressed(bits) return self.UNCOMPRESSED - def _get_bits(self): + def _get_bits(self) -> str: return ''.join(str(b) for b in self._curline[:self._curpos]) - def _get_refline(self, i): + def _get_refline(self, i: int) -> str: if i < 0: return '[]'+''.join(str(b) for b in self._refline) elif len(self._refline) <= i: @@ -415,7 +433,7 @@ def _get_refline(self, i): '['+str(self._refline[i])+']' + ''.join(str(b) for b in self._refline[i+1:])) - def reset(self): + def reset(self) -> None: self._y = 0 self._curline = array.array('b', [1]*self.width) self._reset_line() @@ -423,18 +441,18 @@ def reset(self): self._state = self.MODE return - def output_line(self, y, bits): + def output_line(self, y: int, bits: Sequence[int]) -> None: print(y, ''.join(str(b) for b in bits)) return - def _reset_line(self): + def _reset_line(self) -> None: self._refline = self._curline self._curline = array.array('b', [1]*self.width) self._curpos = -1 self._color = 1 return - def _flush_line(self): + def _flush_line(self) -> None: if self.width <= self._curpos: self.output_line(self._y, self._curline) self._y += 1 @@ -443,7 +461,7 @@ def _flush_line(self): raise self.ByteSkip return - def _do_vertical(self, dx): + def _do_vertical(self, dx: int) -> None: x1 = self._curpos+1 while 1: if x1 == 0: @@ -468,7 +486,7 @@ def _do_vertical(self, dx): self._color = 1-self._color return - def _do_pass(self): + def _do_pass(self) -> None: x1 = self._curpos+1 while 1: if x1 == 0: @@ -495,7 +513,7 @@ def _do_pass(self): self._curpos = x1 return - def _do_horizontal(self, n1, n2): + def _do_horizontal(self, n1: int, n2: int) -> None: if self._curpos < 0: self._curpos = 0 x = self._curpos @@ -512,7 +530,7 @@ def _do_horizontal(self, n1, n2): self._curpos = x return - def _do_uncompressed(self, bits): + def _do_uncompressed(self, bits: str) -> None: for c in bits: self._curline[self._curpos] = int(c) self._curpos += 1 @@ -522,31 +540,32 @@ def _do_uncompressed(self, bits): class CCITTFaxDecoder(CCITTG4Parser): - def __init__(self, width, bytealign=False, reversed=False): + def __init__(self, width: int, bytealign: bool = False, + reversed: bool = False): CCITTG4Parser.__init__(self, width, bytealign=bytealign) self.reversed = reversed self._buf = b'' return - def close(self): + def close(self) -> bytes: return self._buf - def output_line(self, y, bits): + def output_line(self, y: int, bits: Sequence[int]) -> None: bytes = array.array('B', [0]*((len(bits)+7)//8)) if self.reversed: bits = [1-b for b in bits] for (i, b) in enumerate(bits): if b: bytes[i//8] += (128, 64, 32, 16, 8, 4, 2, 1)[i % 8] - self._buf += bytes.tostring() + self._buf += bytes.tobytes() return def ccittfaxdecode(data: bytes, params: Dict[str, Any]) -> bytes: K = params.get('K') - cols = params.get('Columns') - bytealign = params.get('EncodedByteAlign') - reversed = params.get('BlackIs1') + cols = cast(int, params.get('Columns')) + bytealign = cast(bool, params.get('EncodedByteAlign')) + reversed = cast(bool, params.get('BlackIs1')) if K == -1: parser = CCITTFaxDecoder(cols, bytealign=bytealign, reversed=reversed) else: @@ -556,19 +575,20 @@ def ccittfaxdecode(data: bytes, params: Dict[str, Any]) -> bytes: # test -def main(argv): +def main(argv: List[str]) -> None: if not argv[1:]: import unittest - return unittest.main() + unittest.main() + return class Parser(CCITTG4Parser): - def __init__(self, width, bytealign=False): + def __init__(self, width: int, bytealign: bool = False): import pygame # type: ignore[import] CCITTG4Parser.__init__(self, width, bytealign=bytealign) self.img = pygame.Surface((self.width, 1000)) return - def output_line(self, y, bits): + def output_line(self, y: int, bits: Sequence[int]) -> None: for (x, b) in enumerate(bits): if b: self.img.set_at((x, y), (255, 255, 255)) @@ -576,7 +596,7 @@ def output_line(self, y, bits): self.img.set_at((x, y), (0, 0, 0)) return - def close(self): + def close(self) -> None: import pygame pygame.image.save(self.img, 'out.bmp') return @@ -591,4 +611,4 @@ def close(self): if __name__ == '__main__': - sys.exit(main(sys.argv)) + main(sys.argv)