-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3 from HorsemanWSGI/1.0
1.0
- Loading branch information
Showing
27 changed files
with
680 additions
and
817 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
import typing as t | ||
import urllib.parse | ||
from biscuits import Cookie, parse | ||
from frozendict import frozendict | ||
from horseman.types import MIMEType | ||
from horseman.utils import parse_header | ||
|
||
|
||
class Data(t.NamedTuple): | ||
form: t.Optional[t.Iterable[t.Tuple[str, t.Any]]] = None | ||
json: t.Optional[t.Union[t.Dict, t.List]] = None # not too specific | ||
|
||
|
||
class ContentType(str): | ||
__slots__ = ('mimetype', 'options') | ||
|
||
mimetype: MIMEType | ||
options: t.Mapping[str, str] | ||
|
||
def __new__(cls, value: str): | ||
if isinstance(value, cls): | ||
return value | ||
|
||
mimetype, params = parse_header(value) | ||
instance = str.__new__( | ||
cls, mimetype + "".join( | ||
f"; {k}={v}" for k, v in sorted(params.items()))) | ||
|
||
instance.mimetype = mimetype | ||
instance.options = frozendict(params) | ||
return instance | ||
|
||
|
||
class MediaType(ContentType): | ||
__slots__ = ('options', 'mimetype', 'maintype', 'subtype') | ||
|
||
mimetype: MIMEType | ||
maintype: str | ||
subtype: t.Optional[str] | ||
options: t.Mapping[str, str] | ||
|
||
def __new__(cls, value: str): | ||
if isinstance(value, cls): | ||
return value | ||
|
||
mimetype, params = parse_header(value) | ||
|
||
if mimetype == '*': | ||
maintype = "*" | ||
subtype = "*" | ||
elif '/' in mimetype: | ||
type_parts = mimetype.split('/') | ||
if not type_parts or len(type_parts) > 2: | ||
raise ValueError(f"Can't parse mimetype {mimetype!r}") | ||
maintype, subtype = type_parts | ||
else: | ||
maintype = mimetype | ||
subtype = None | ||
|
||
instance = str.__new__( | ||
cls, mimetype + "".join( | ||
f"; {k}={v}" for k, v in sorted(params.items()))) | ||
|
||
instance.mimetype = mimetype | ||
instance.maintype = maintype | ||
instance.subtype = subtype | ||
instance.options = frozendict(params) | ||
return instance | ||
|
||
def match(self, other: str) -> bool: | ||
other_media_type = MediaType(other) | ||
return self.maintype in {'*', other_media_type.maintype} and \ | ||
self.subtype in {"*", other_media_type.subtype} | ||
|
||
|
||
class Cookies(t.Dict[str, Cookie]): | ||
"""A Cookies management class, built on top of biscuits.""" | ||
|
||
def set(self, name: str, *args, **kwargs): | ||
self[name] = Cookie(name, *args, **kwargs) | ||
|
||
@staticmethod | ||
def from_string(value: str): | ||
return parse(value) | ||
|
||
|
||
class Query(frozendict[str, t.Sequence[str]]): | ||
|
||
TRUE_STRINGS = {'t', 'true', 'yes', '1', 'on'} | ||
FALSE_STRINGS = {'f', 'false', 'no', '0', 'off'} | ||
NONE_STRINGS = {'n', 'none', 'null'} | ||
|
||
def get(self, name: str, default=None): | ||
"""Return the first value of the found list. | ||
""" | ||
return super().get(name, [None])[0] | ||
|
||
def getlist(self, name: str) -> t.Sequence[str]: | ||
"""Return the value list | ||
""" | ||
return super().get(name, []) | ||
|
||
def as_bool(self, key: str) -> t.Optional[bool]: | ||
value = self[key][0] | ||
if value in (True, False, None): | ||
return value | ||
value = value.lower() | ||
if value in self.TRUE_STRINGS: | ||
return True | ||
elif value in self.FALSE_STRINGS: | ||
return False | ||
elif value in self.NONE_STRINGS: | ||
return None | ||
raise ValueError(f"Can't cast {value!r} to boolean.") | ||
|
||
def as_int(self, key: str) -> int: | ||
return int(self[key][0]) | ||
|
||
def as_float(self, key: str) -> float: | ||
return float(self[key][0]) | ||
|
||
@classmethod | ||
def from_string( | ||
cls, | ||
value: str, | ||
keep_blank_values: bool = True, | ||
strict_parsing: bool = True, | ||
encoding: str = 'utf-8', | ||
errors: t.Literal['strict', 'replace', 'ignore'] = 'replace', | ||
max_num_fields: int = None, | ||
separator: str = '&' | ||
): | ||
if not value: | ||
return cls() | ||
return cls( | ||
(key, tuple(l)) for key, l in urllib.parse.parse_qs( | ||
value, | ||
keep_blank_values=keep_blank_values, | ||
strict_parsing=strict_parsing, | ||
encoding=encoding, | ||
errors=errors, | ||
max_num_fields=max_num_fields, | ||
separator=separator, | ||
).items()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import typing as t | ||
import urllib.parse | ||
from collections.abc import Mapping | ||
from functools import cached_property | ||
from horseman.types import Environ | ||
from horseman.parsers import Data, parser | ||
from horseman.datastructures import Cookies, ContentType, Query | ||
|
||
|
||
class immutable_cached_property(cached_property): | ||
|
||
def __set__(self, instance, value): | ||
raise AttributeError("can't set attribute") | ||
|
||
def __delete__(self, instance): | ||
del instance.__dict__[self.attrname] | ||
|
||
|
||
class WSGIEnvironWrapper(Environ): | ||
|
||
def __init__(self, environ: Environ): | ||
if isinstance(environ, WSGIEnvironWrapper): | ||
raise TypeError( | ||
f'{self.__class__!r} cannot wrap a subclass of itself.') | ||
self._environ: Environ = environ | ||
|
||
def __setitem__(self, key: str, value: t.Any): | ||
raise NotImplementedError(f'{self!r} is immutable') | ||
|
||
def __delitem__(self, key: str): | ||
raise NotImplementedError(f'{self!r} is immutable') | ||
|
||
def __getitem__(self, key: str) -> t.Any: | ||
return self._environ[key] | ||
|
||
def __iter__(self) -> t.Iterator[str]: | ||
return iter(self._environ) | ||
|
||
def __len__(self) -> int: | ||
return len(self._environ) | ||
|
||
def __eq__(self, other: t.Any) -> bool: | ||
if isinstance(other, self.__class__): | ||
return self._environ == other._environ | ||
if isinstance(other, Mapping): | ||
return self._environ == other | ||
raise NotImplementedError( | ||
f'{other!r} cannot be compared to {self!r}') | ||
|
||
@immutable_cached_property | ||
def method(self) -> str: | ||
return self._environ.get('REQUEST_METHOD', 'GET').upper() | ||
|
||
@immutable_cached_property | ||
def params(self) -> t.Dict[str, t.Any]: | ||
"""Path params collected by the traversing or routing. | ||
""" | ||
return self.get("PATH_PARAMS", {}) | ||
|
||
@immutable_cached_property | ||
def body(self) -> t.BinaryIO: | ||
return self._environ['wsgi.input'] | ||
|
||
@immutable_cached_property | ||
def data(self) -> Data: | ||
if self.content_type: | ||
return parser.parse( | ||
self._environ['wsgi.input'], self.content_type) | ||
return Data() | ||
|
||
@immutable_cached_property | ||
def script_name(self) -> str: | ||
return urllib.parse.quote(self._environ.get('SCRIPT_NAME', '')) | ||
|
||
@immutable_cached_property | ||
def path(self) -> str: | ||
if path := self._environ.get('PATH_INFO'): | ||
return path.encode('latin-1').decode('utf-8') | ||
return '/' | ||
|
||
@immutable_cached_property | ||
def query(self) -> Query: | ||
return Query.from_string(self._environ.get('QUERY_STRING', '')) | ||
|
||
@immutable_cached_property | ||
def cookies(self) -> t.Optional[Cookies]: | ||
if cookie_header := self._environ.get('HTTP_COOKIE'): | ||
return Cookies.from_string(cookie_header) | ||
return None | ||
|
||
@immutable_cached_property | ||
def content_type(self) -> t.Optional[ContentType]: | ||
if content_type := self._environ.get('CONTENT_TYPE'): | ||
return ContentType(content_type) | ||
return None | ||
|
||
@immutable_cached_property | ||
def application_uri(self) -> str: | ||
scheme = self._environ.get('wsgi.url_scheme', 'http') | ||
http_host = self._environ.get('HTTP_HOST') | ||
if not http_host: | ||
server = self._environ['SERVER_NAME'] | ||
port = self._environ.get('SERVER_PORT', '80') | ||
elif ':' in http_host: | ||
server, port = http_host.split(':', 1) | ||
else: | ||
server = http_host | ||
port = '80' | ||
|
||
if (scheme == 'http' and port == '80') or \ | ||
(scheme == 'https' and port == '443'): | ||
return f'{scheme}://{server}{self.script_name}' | ||
return f'{scheme}://{server}:{port}{self.script_name}' | ||
|
||
def uri(self, include_query: bool = True) -> str: | ||
path_info = urllib.parse.quote(self._environ.get('PATH_INFO', '')) | ||
if include_query: | ||
qs = urllib.parse.quote(self._environ.get('QUERY_STRING', '')) | ||
if qs: | ||
return f"{self.application_uri}{path_info}?{qs}" | ||
return f"{self.application_uri}{path_info}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import typing as t | ||
from http import HTTPStatus | ||
from horseman.types import HTTPCode | ||
|
||
|
||
class ParsingException(ValueError): | ||
pass | ||
|
||
|
||
class HTTPError(Exception): | ||
|
||
def __init__(self, | ||
status: HTTPCode, | ||
body: t.Optional[t.Union[str, bytes]] = None): | ||
self.status = HTTPStatus(status) | ||
body = self.status.description if body is None else body | ||
if isinstance(body, bytes): | ||
body = body.decode('utf-8') | ||
elif not isinstance(body, str): | ||
raise ValueError('Body must be string or bytes.') | ||
self.body: str = body | ||
|
||
def __bytes__(self) -> bytes: | ||
return ('HTTP/1.1 {status} {phrase}\r\n' | ||
'Content-Length: {length}\r\n\r\n{body}').format( | ||
status=self.status.value, | ||
phrase=self.status.phrase, | ||
length=len(self.body), | ||
body=self.body | ||
).encode() |
Oops, something went wrong.