Skip to content

Commit

Permalink
Merge pull request #530 from JohnVillalovos/jlvillal/mypy_1
Browse files Browse the repository at this point in the history
chore: add type-hints to `response_*.py`
  • Loading branch information
mjs authored Aug 27, 2023
2 parents 7465000 + e8bcae8 commit a34f173
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 42 deletions.
103 changes: 72 additions & 31 deletions imapclient/response_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,35 @@

# TODO more exact error reporting

import datetime
import re
import sys
from collections import defaultdict
from typing import cast, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union

from .datetime_util import parse_to_datetime
from .exceptions import ProtocolError
from .response_lexer import TokenSource
from .response_types import Address, BodyData, Envelope, SearchIds
from .typing_imapclient import _Atom

__all__ = ["parse_response", "parse_message_list"]


def parse_response(data):
def parse_response(data: List[bytes]) -> Tuple[_Atom, ...]:
"""Pull apart IMAP command responses.
Returns nested tuples of appropriately typed objects.
"""
if data == [None]:
return []
return tuple()
return tuple(gen_parsed_response(data))


_msg_id_pattern = re.compile(r"(\d+(?: +\d+)*)")


def parse_message_list(data):
def parse_message_list(data: List[Union[bytes, str]]) -> SearchIds:
"""Parse a list of message ids and return them as a list.
parse_response is also capable of doing this but this is
Expand All @@ -50,36 +53,38 @@ def parse_message_list(data):
if len(data) != 1:
raise ValueError("unexpected message list data")

data = data[0]
if not data:
message_data = data[0]
if not message_data:
return SearchIds()

if isinstance(data, bytes):
data = data.decode("ascii")
if isinstance(message_data, bytes):
message_data = message_data.decode("ascii")

m = _msg_id_pattern.match(data)
m = _msg_id_pattern.match(message_data)
if not m:
raise ValueError("unexpected message list format")

ids = SearchIds(int(n) for n in m.group(1).split())

# Parse any non-numeric part on the end using parse_response (this
# is likely to be the MODSEQ section).
extra = data[m.end(1) :]
extra = message_data[m.end(1) :]
if extra:
for item in parse_response([extra.encode("ascii")]):
if (
isinstance(item, tuple)
and len(item) == 2
and item[0].lower() == b"modseq"
and cast(bytes, item[0]).lower() == b"modseq"
):
if TYPE_CHECKING:
assert isinstance(item[1], int)
ids.modseq = item[1]
elif isinstance(item, int):
ids.append(item)
return ids


def gen_parsed_response(text):
def gen_parsed_response(text: List[bytes]) -> Iterator[_Atom]:
if not text:
return
src = TokenSource(text)
Expand All @@ -92,20 +97,29 @@ def gen_parsed_response(text):
raise
except ValueError:
_, err, _ = sys.exc_info()
raise ProtocolError("%s: %s" % (str(err), token))
raise ProtocolError("%s: %r" % (str(err), token))


def parse_fetch_response(text, normalise_times=True, uid_is_key=True):
_ParseFetchResponseInnerDict = Dict[
bytes, Optional[Union[datetime.datetime, int, BodyData, Envelope, _Atom]]
]


def parse_fetch_response(
text: List[bytes], normalise_times: bool = True, uid_is_key: bool = True
) -> "defaultdict[int, _ParseFetchResponseInnerDict]":
"""Pull apart IMAP FETCH responses as returned by imaplib.
Returns a dictionary, keyed by message ID. Each value a dictionary
keyed by FETCH field type (eg."RFC822").
"""
if text == [None]:
return {}
return defaultdict()
response = gen_parsed_response(text)

parsed_response = defaultdict(dict)
parsed_response: "defaultdict[int, _ParseFetchResponseInnerDict]" = defaultdict(
dict
)
while True:
try:
msg_id = seq = _int_or_error(next(response), "invalid message ID")
Expand All @@ -126,9 +140,12 @@ def parse_fetch_response(text, normalise_times=True, uid_is_key=True):

# always return the sequence of the message, so it is available
# even if we return keyed by UID.
msg_data = {b"SEQ": seq}
msg_data: _ParseFetchResponseInnerDict = {b"SEQ": seq}
for i in range(0, len(msg_response), 2):
word = msg_response[i].upper()
msg_attribute = msg_response[i]
if TYPE_CHECKING:
assert isinstance(msg_attribute, bytes)
word = msg_attribute.upper()
value = msg_response[i + 1]

if word == b"UID":
Expand All @@ -142,6 +159,8 @@ def parse_fetch_response(text, normalise_times=True, uid_is_key=True):
elif word == b"ENVELOPE":
msg_data[word] = _convert_ENVELOPE(value, normalise_times)
elif word in (b"BODY", b"BODYSTRUCTURE"):
if TYPE_CHECKING:
assert isinstance(value, tuple)
msg_data[word] = BodyData.create(value)
else:
msg_data[word] = value
Expand All @@ -151,56 +170,78 @@ def parse_fetch_response(text, normalise_times=True, uid_is_key=True):
return parsed_response


def _int_or_error(value, error_text):
def _int_or_error(value: _Atom, error_text: str) -> int:
try:
return int(value)
return int(value) # type: ignore[arg-type]
except (TypeError, ValueError):
raise ProtocolError("%s: %s" % (error_text, repr(value)))


def _convert_INTERNALDATE(date_string, normalise_times=True):
def _convert_INTERNALDATE(
date_string: _Atom, normalise_times: bool = True
) -> Optional[datetime.datetime]:
if date_string is None:
return None

try:
if TYPE_CHECKING:
assert isinstance(date_string, bytes)
return parse_to_datetime(date_string, normalise=normalise_times)
except ValueError:
return None


def _convert_ENVELOPE(envelope_response, normalise_times=True):
def _convert_ENVELOPE(
envelope_response: _Atom, normalise_times: bool = True
) -> Envelope:
if TYPE_CHECKING:
assert isinstance(envelope_response, tuple)
dt = None
if envelope_response[0]:
try:
dt = parse_to_datetime(envelope_response[0], normalise=normalise_times)
if TYPE_CHECKING:
assert isinstance(envelope_response[0], bytes)
dt = parse_to_datetime(
envelope_response[0],
normalise=normalise_times,
)
except ValueError:
pass

subject = envelope_response[1]

# addresses contains a tuple of addresses
# from, sender, reply_to, to, cc, bcc headers
addresses = []
addresses: List[Optional[Tuple[Address, ...]]] = []
for addr_list in envelope_response[2:8]:
addrs = []
if addr_list:
if TYPE_CHECKING:
assert isinstance(addr_list, tuple)
for addr_tuple in addr_list:
if TYPE_CHECKING:
assert isinstance(addr_tuple, tuple)
if addr_tuple:
addrs.append(Address(*addr_tuple))
addresses.append(tuple(addrs))
else:
addresses.append(None)

return Envelope(
dt,
subject,
*addresses,
date=dt,
subject=subject,
from_=addresses[0],
sender=addresses[1],
reply_to=addresses[2],
to=addresses[3],
cc=addresses[4],
bcc=addresses[5],
in_reply_to=envelope_response[8],
message_id=envelope_response[9]
message_id=envelope_response[9],
)


def atom(src, token):
def atom(src: TokenSource, token: bytes) -> _Atom:
if token == b"(":
return parse_tuple(src)
if token == b"NIL":
Expand All @@ -224,8 +265,8 @@ def atom(src, token):
return token


def parse_tuple(src):
out = []
def parse_tuple(src: TokenSource) -> _Atom:
out: List[_Atom] = []
for token in src:
if token == b")":
return tuple(out)
Expand All @@ -234,5 +275,5 @@ def parse_tuple(src):
raise ProtocolError('Tuple incomplete before "(%s"' % _fmt_tuple(out))


def _fmt_tuple(t):
def _fmt_tuple(t: List[_Atom]) -> str:
return " ".join(str(item) for item in t)
27 changes: 18 additions & 9 deletions imapclient/response_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from collections import namedtuple
from email.utils import formataddr
from typing import Any, List, Optional, Tuple, TYPE_CHECKING, Union

from .typing_imapclient import _Atom
from .util import to_unicode


Expand Down Expand Up @@ -79,7 +81,7 @@ class Address(namedtuple("Address", "name route mailbox host")):
"group syntax".
"""

def __str__(self):
def __str__(self) -> str:
if self.mailbox and self.host:
address = to_unicode(self.mailbox) + "@" + to_unicode(self.host)
else:
Expand All @@ -88,7 +90,7 @@ def __str__(self):
return formataddr((to_unicode(self.name), address))


class SearchIds(list):
class SearchIds(List[int]):
"""
Contains a list of message ids as returned by IMAPClient.search().
Expand All @@ -97,30 +99,37 @@ class SearchIds(list):
criteria). See :rfc:`4551` for more details.
"""

def __init__(self, *args):
list.__init__(self, *args)
self.modseq = None
def __init__(self, *args: Any):
super().__init__(*args)
self.modseq: Optional[int] = None


class BodyData(tuple):
_BodyDataType = Tuple[Union[bytes, int, "BodyData"], "_BodyDataType"]


class BodyData(_BodyDataType):
"""
Returned when parsing BODY and BODYSTRUCTURE responses.
"""

@classmethod
def create(cls, response):
def create(cls, response: Tuple[_Atom, ...]) -> "BodyData":
# In case of multipart messages we will see at least 2 tuples
# at the start. Nest these in to a list so that the returned
# response tuple always has a consistent number of elements
# regardless of whether the message is multipart or not.
if isinstance(response[0], tuple):
# Multipart, find where the message part tuples stop
parts = []
for i, part in enumerate(response):
if isinstance(part, bytes):
break
return cls(([cls.create(part) for part in response[:i]],) + response[i:])
if TYPE_CHECKING:
assert isinstance(part, tuple)
parts.append(part)
return cls(([cls.create(part) for part in parts],) + response[i:])
return cls(response)

@property
def is_multipart(self):
def is_multipart(self) -> bool:
return isinstance(self[0], list)
4 changes: 4 additions & 0 deletions imapclient/typing_imapclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from typing import Tuple, Union

_AtomPart = Union[None, int, bytes]
_Atom = Union[_AtomPart, Tuple["_Atom", ...]]
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ module = [
"imapclient.config",
"imapclient.imapclient",
"imapclient.interact",
"imapclient.response_parser",
"imapclient.response_types",
"interact",
"livetest",
"setup",
Expand Down

0 comments on commit a34f173

Please sign in to comment.