Skip to content

Commit

Permalink
Move remaining utility functions from _utils.py to _models.py (#3387)
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaelWO authored Nov 1, 2024
1 parent 6212e8f commit 41597ad
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 124 deletions.
84 changes: 74 additions & 10 deletions httpx/_models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import codecs
import datetime
import email.message
import json as jsonlib
import re
import typing
import urllib.request
from collections.abc import Mapping
Expand Down Expand Up @@ -44,15 +46,23 @@
SyncByteStream,
)
from ._urls import URL
from ._utils import (
is_known_encoding,
obfuscate_sensitive_headers,
parse_content_type_charset,
parse_header_links,
)
from ._utils import to_bytes_or_str, to_str

__all__ = ["Cookies", "Headers", "Request", "Response"]

SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}


def _is_known_encoding(encoding: str) -> bool:
"""
Return `True` if `encoding` is a known codec.
"""
try:
codecs.lookup(encoding)
except LookupError:
return False
return True


def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes:
"""
Expand All @@ -72,6 +82,60 @@ def _normalize_header_value(value: str | bytes, encoding: str | None = None) ->
return value.encode(encoding or "ascii")


def _parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
msg["content-type"] = content_type
return msg.get_content_charset(failobj=None)


def _parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
The generic syntax of those is:
Link: < uri-reference >; param1=value1; param2="value2"
So for instance:
Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
would return
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
]
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
for val in re.split(", *<", value):
try:
url, params = val.split(";", 1)
except ValueError:
url, params = val, ""
link = {"url": url.strip("<> '\"")}
for param in params.split(";"):
try:
key, value = param.split("=")
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
links.append(link)
return links


def _obfuscate_sensitive_headers(
items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v


class Headers(typing.MutableMapping[str, str]):
"""
HTTP headers, as a case-insensitive multi-dict.
Expand Down Expand Up @@ -306,7 +370,7 @@ def __repr__(self) -> str:
if self.encoding != "ascii":
encoding_str = f", encoding={self.encoding!r}"

as_list = list(obfuscate_sensitive_headers(self.multi_items()))
as_list = list(_obfuscate_sensitive_headers(self.multi_items()))
as_dict = dict(as_list)

no_duplicate_keys = len(as_dict) == len(as_list)
Expand Down Expand Up @@ -599,7 +663,7 @@ def encoding(self) -> str | None:
"""
if not hasattr(self, "_encoding"):
encoding = self.charset_encoding
if encoding is None or not is_known_encoding(encoding):
if encoding is None or not _is_known_encoding(encoding):
if isinstance(self.default_encoding, str):
encoding = self.default_encoding
elif hasattr(self, "_content"):
Expand Down Expand Up @@ -630,7 +694,7 @@ def charset_encoding(self) -> str | None:
if content_type is None:
return None

return parse_content_type_charset(content_type)
return _parse_content_type_charset(content_type)

def _get_content_decoder(self) -> ContentDecoder:
"""
Expand Down Expand Up @@ -785,7 +849,7 @@ def links(self) -> dict[str | None, dict[str, str]]:

return {
(link.get("rel") or link.get("url")): link
for link in parse_header_links(header)
for link in _parse_header_links(header)
}

@property
Expand Down
70 changes: 0 additions & 70 deletions httpx/_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

import codecs
import email.message
import ipaddress
import os
import re
Expand Down Expand Up @@ -29,74 +27,6 @@ def primitive_value_to_str(value: PrimitiveData) -> str:
return str(value)


def is_known_encoding(encoding: str) -> bool:
"""
Return `True` if `encoding` is a known codec.
"""
try:
codecs.lookup(encoding)
except LookupError:
return False
return True


def parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
The generic syntax of those is:
Link: < uri-reference >; param1=value1; param2="value2"
So for instance:
Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
would return
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
]
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
for val in re.split(", *<", value):
try:
url, params = val.split(";", 1)
except ValueError:
url, params = val, ""
link = {"url": url.strip("<> '\"")}
for param in params.split(";"):
try:
key, value = param.split("=")
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
links.append(link)
return links


def parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
msg["content-type"] = content_type
return msg.get_content_charset(failobj=None)


SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}


def obfuscate_sensitive_headers(
items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v


def port_or_default(url: URL) -> int | None:
if url.port is not None:
return url.port
Expand Down
2 changes: 1 addition & 1 deletion scripts/lint
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ export SOURCE_FILES="httpx tests"

set -x

${PREFIX}ruff --fix $SOURCE_FILES
${PREFIX}ruff check --fix $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES
43 changes: 43 additions & 0 deletions tests/models/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,46 @@ def test_sensitive_headers(header):
value = "s3kr3t"
h = httpx.Headers({header: value})
assert repr(h) == "Headers({'%s': '[secure]'})" % header


@pytest.mark.parametrize(
"headers, output",
[
([("content-type", "text/html")], [("content-type", "text/html")]),
([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
],
)
def test_obfuscate_sensitive_headers(headers, output):
as_dict = {k: v for k, v in output}
headers_class = httpx.Headers({k: v for k, v in headers})
assert repr(headers_class) == f"Headers({as_dict!r})"


@pytest.mark.parametrize(
"value, expected",
(
(
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
),
("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
(
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
],
),
("", []),
),
)
def test_parse_header_links(value, expected):
all_links = httpx.Response(200, headers={"link": value}).links.values()
assert all(link in all_links for link in expected)


def test_parse_header_links_no_link():
all_links = httpx.Response(200).links
assert all_links == {}
43 changes: 0 additions & 43 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,6 @@ def test_guess_by_bom(encoding, expected):
assert response.json() == {"abc": 123}


@pytest.mark.parametrize(
"value, expected",
(
(
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
),
("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
(
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
[
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
{"url": "http://.../back.jpeg"},
],
),
("", []),
),
)
def test_parse_header_links(value, expected):
all_links = httpx.Response(200, headers={"link": value}).links.values()
assert all(link in all_links for link in expected)


def test_parse_header_links_no_link():
all_links = httpx.Response(200).links
assert all_links == {}


def test_logging_request(server, caplog):
caplog.set_level(logging.INFO)
with httpx.Client() as client:
Expand Down Expand Up @@ -144,20 +115,6 @@ def test_get_environment_proxies(environment, proxies):
assert get_environment_proxies() == proxies


@pytest.mark.parametrize(
"headers, output",
[
([("content-type", "text/html")], [("content-type", "text/html")]),
([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
],
)
def test_obfuscate_sensitive_headers(headers, output):
as_dict = {k: v for k, v in output}
headers_class = httpx.Headers({k: v for k, v in headers})
assert repr(headers_class) == f"Headers({as_dict!r})"


def test_same_origin():
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")
Expand Down

0 comments on commit 41597ad

Please sign in to comment.