Skip to content

Commit

Permalink
annotate ccitt.py, and fix one definite bug (array.tostring was renam…
Browse files Browse the repository at this point in the history
…ed tobytes)
  • Loading branch information
0xabu committed Sep 5, 2021
1 parent 6052906 commit e58fd48
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 41 deletions.
6 changes: 1 addition & 5 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@ warn_return_any = True
no_implicit_reexport = True
strict_equality = True

# This seems impossible to leave on in a version-independent manner
# This seems impossible to turn on in a version-independent manner
warn_unused_ignores = False

[mypy-pdfminer.*]
disallow_untyped_defs = True

[mypy-pdfminer.ccitt]
disallow_untyped_calls = False
disallow_untyped_defs = False

[mypy-cryptography.hazmat.*]
ignore_missing_imports = True

Expand Down
92 changes: 56 additions & 36 deletions pdfminer/ccitt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,56 @@

import sys
import array
from typing import Any, Dict
from typing import (Any, Callable, Dict, Iterator, List, MutableSequence,
Optional, Sequence, Union, cast)


def get_bytes(data):
def get_bytes(data: bytes) -> Iterator[int]:
yield from data


# Workaround https://github.com/python/mypy/issues/731
BitParserState = MutableSequence[Any]
# A better definition (not supported by mypy) would be:
# BitParserState = MutableSequence[Union["BitParserState", int, str, None]]


class BitParser:
def __init__(self):
_state: BitParserState

# _accept is declared Optional solely as a workaround for
# https://github.com/python/mypy/issues/708
_accept: Optional[Callable[[Any], BitParserState]]

def __init__(self) -> None:
self._pos = 0
return

@classmethod
def add(cls, root, v, bits):
p = root
def add(cls, root: BitParserState, v: Union[int, str], bits: str) -> None:
p: BitParserState = root
b = None
for i in range(len(bits)):
if 0 < i:
assert b is not None
if p[b] is None:
p[b] = [None, None]
p = p[b]
if bits[i] == '1':
b = 1
else:
b = 0
assert b is not None
p[b] = v
return

def feedbytes(self, data):
def feedbytes(self, data: bytes) -> None:
for byte in get_bytes(data):
for m in (128, 64, 32, 16, 8, 4, 2, 1):
self._parse_bit(byte & m)
return

def _parse_bit(self, x):
def _parse_bit(self, x: Any) -> None:
if x:
v = self._state[1]
else:
Expand All @@ -56,6 +71,7 @@ def _parse_bit(self, x):
if isinstance(v, list):
self._state = v
else:
assert self._accept is not None
self._state = self._accept(v)
return

Expand Down Expand Up @@ -319,14 +335,16 @@ class InvalidData(Exception):
class ByteSkip(Exception):
pass

def __init__(self, width, bytealign=False):
_color: int

def __init__(self, width: int, bytealign: bool = False):
BitParser.__init__(self)
self.width = width
self.bytealign = bytealign
self.reset()
return

def feedbytes(self, data):
def feedbytes(self, data: bytes) -> None:
for byte in get_bytes(data):
try:
for m in (128, 64, 32, 16, 8, 4, 2, 1):
Expand All @@ -338,7 +356,7 @@ def feedbytes(self, data):
break
return

def _parse_mode(self, mode):
def _parse_mode(self, mode: Any) -> BitParserState:
if mode == 'p':
self._do_pass()
self._flush_line()
Expand All @@ -362,7 +380,7 @@ def _parse_mode(self, mode):
else:
raise self.InvalidData(mode)

def _parse_horiz1(self, n):
def _parse_horiz1(self, n: Any) -> BitParserState:
if n is None:
raise self.InvalidData
self._n1 += n
Expand All @@ -375,7 +393,7 @@ def _parse_horiz1(self, n):
else:
return self.BLACK

def _parse_horiz2(self, n):
def _parse_horiz2(self, n: Any) -> BitParserState:
if n is None:
raise self.InvalidData
self._n2 += n
Expand All @@ -390,7 +408,7 @@ def _parse_horiz2(self, n):
else:
return self.BLACK

def _parse_uncompressed(self, bits):
def _parse_uncompressed(self, bits: Optional[str]) -> BitParserState:
if not bits:
raise self.InvalidData
if bits.startswith('T'):
Expand All @@ -402,10 +420,10 @@ def _parse_uncompressed(self, bits):
self._do_uncompressed(bits)
return self.UNCOMPRESSED

def _get_bits(self):
def _get_bits(self) -> str:
return ''.join(str(b) for b in self._curline[:self._curpos])

def _get_refline(self, i):
def _get_refline(self, i: int) -> str:
if i < 0:
return '[]'+''.join(str(b) for b in self._refline)
elif len(self._refline) <= i:
Expand All @@ -415,26 +433,26 @@ def _get_refline(self, i):
'['+str(self._refline[i])+']' +
''.join(str(b) for b in self._refline[i+1:]))

def reset(self):
def reset(self) -> None:
self._y = 0
self._curline = array.array('b', [1]*self.width)
self._reset_line()
self._accept = self._parse_mode
self._state = self.MODE
return

def output_line(self, y, bits):
def output_line(self, y: int, bits: Sequence[int]) -> None:
print(y, ''.join(str(b) for b in bits))
return

def _reset_line(self):
def _reset_line(self) -> None:
self._refline = self._curline
self._curline = array.array('b', [1]*self.width)
self._curpos = -1
self._color = 1
return

def _flush_line(self):
def _flush_line(self) -> None:
if self.width <= self._curpos:
self.output_line(self._y, self._curline)
self._y += 1
Expand All @@ -443,7 +461,7 @@ def _flush_line(self):
raise self.ByteSkip
return

def _do_vertical(self, dx):
def _do_vertical(self, dx: int) -> None:
x1 = self._curpos+1
while 1:
if x1 == 0:
Expand All @@ -468,7 +486,7 @@ def _do_vertical(self, dx):
self._color = 1-self._color
return

def _do_pass(self):
def _do_pass(self) -> None:
x1 = self._curpos+1
while 1:
if x1 == 0:
Expand All @@ -495,7 +513,7 @@ def _do_pass(self):
self._curpos = x1
return

def _do_horizontal(self, n1, n2):
def _do_horizontal(self, n1: int, n2: int) -> None:
if self._curpos < 0:
self._curpos = 0
x = self._curpos
Expand All @@ -512,7 +530,7 @@ def _do_horizontal(self, n1, n2):
self._curpos = x
return

def _do_uncompressed(self, bits):
def _do_uncompressed(self, bits: str) -> None:
for c in bits:
self._curline[self._curpos] = int(c)
self._curpos += 1
Expand All @@ -522,31 +540,32 @@ def _do_uncompressed(self, bits):

class CCITTFaxDecoder(CCITTG4Parser):

def __init__(self, width, bytealign=False, reversed=False):
def __init__(self, width: int, bytealign: bool = False,
reversed: bool = False):
CCITTG4Parser.__init__(self, width, bytealign=bytealign)
self.reversed = reversed
self._buf = b''
return

def close(self):
def close(self) -> bytes:
return self._buf

def output_line(self, y, bits):
def output_line(self, y: int, bits: Sequence[int]) -> None:
bytes = array.array('B', [0]*((len(bits)+7)//8))
if self.reversed:
bits = [1-b for b in bits]
for (i, b) in enumerate(bits):
if b:
bytes[i//8] += (128, 64, 32, 16, 8, 4, 2, 1)[i % 8]
self._buf += bytes.tostring()
self._buf += bytes.tobytes()
return


def ccittfaxdecode(data: bytes, params: Dict[str, Any]) -> bytes:
K = params.get('K')
cols = params.get('Columns')
bytealign = params.get('EncodedByteAlign')
reversed = params.get('BlackIs1')
cols = cast(int, params.get('Columns'))
bytealign = cast(bool, params.get('EncodedByteAlign'))
reversed = cast(bool, params.get('BlackIs1'))
if K == -1:
parser = CCITTFaxDecoder(cols, bytealign=bytealign, reversed=reversed)
else:
Expand All @@ -556,27 +575,28 @@ def ccittfaxdecode(data: bytes, params: Dict[str, Any]) -> bytes:


# test
def main(argv):
def main(argv: List[str]) -> None:
if not argv[1:]:
import unittest
return unittest.main()
unittest.main()
return

class Parser(CCITTG4Parser):
def __init__(self, width, bytealign=False):
def __init__(self, width: int, bytealign: bool = False):
import pygame # type: ignore[import]
CCITTG4Parser.__init__(self, width, bytealign=bytealign)
self.img = pygame.Surface((self.width, 1000))
return

def output_line(self, y, bits):
def output_line(self, y: int, bits: Sequence[int]) -> None:
for (x, b) in enumerate(bits):
if b:
self.img.set_at((x, y), (255, 255, 255))
else:
self.img.set_at((x, y), (0, 0, 0))
return

def close(self):
def close(self) -> None:
import pygame
pygame.image.save(self.img, 'out.bmp')
return
Expand All @@ -591,4 +611,4 @@ def close(self):


if __name__ == '__main__':
sys.exit(main(sys.argv))
main(sys.argv)

0 comments on commit e58fd48

Please sign in to comment.