From 32fa92fbdf539f02a26cdc34d155e4095020f309 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 26 Dec 2023 12:19:54 +0100 Subject: [PATCH] Use __future__.annotations (#2199) --- pyproject.toml | 2 + uvicorn/_subprocess.py | 10 +- uvicorn/_types.py | 42 +++--- uvicorn/config.py | 127 ++++++++---------- uvicorn/logging.py | 10 +- uvicorn/main.py | 70 +++++----- uvicorn/protocols/http/auto.py | 5 +- uvicorn/protocols/http/h11_impl.py | 37 +++-- uvicorn/protocols/http/httptools_impl.py | 43 +++--- uvicorn/protocols/utils.py | 11 +- .../protocols/websockets/websockets_impl.py | 17 +-- uvicorn/protocols/websockets/wsproto_impl.py | 26 ++-- uvicorn/server.py | 25 ++-- uvicorn/supervisors/basereload.py | 18 +-- uvicorn/supervisors/multiprocess.py | 12 +- uvicorn/supervisors/statreload.py | 12 +- uvicorn/supervisors/watchfilesreload.py | 10 +- uvicorn/supervisors/watchgodreload.py | 14 +- 18 files changed, 244 insertions(+), 247 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 94cef1453..fc8f0af50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,5 +124,7 @@ py-gte-38 = "sys_version_info >= (3, 8)" py-lt-38 = "sys_version_info < (3, 8)" py-gte-39 = "sys_version_info >= (3, 9)" py-lt-39 = "sys_version_info < (3, 9)" +py-gte-310 = "sys_version_info >= (3, 10)" +py-lt-310 = "sys_version_info < (3, 10)" py-gte-311 = "sys_version_info >= (3, 11)" py-lt-311 = "sys_version_info < (3, 11)" diff --git a/uvicorn/_subprocess.py b/uvicorn/_subprocess.py index e05473c72..21fd33c21 100644 --- a/uvicorn/_subprocess.py +++ b/uvicorn/_subprocess.py @@ -2,12 +2,14 @@ Some light wrappers around Python's multiprocessing, to deal with cleanly starting child processes. """ +from __future__ import annotations + import multiprocessing import os import sys from multiprocessing.context import SpawnProcess from socket import socket -from typing import Callable, List, Optional +from typing import Callable, Optional from uvicorn.config import Config @@ -18,7 +20,7 @@ def get_subprocess( config: Config, target: Callable[..., None], - sockets: List[socket], + sockets: list[socket], ) -> SpawnProcess: """ Called in the parent process, to instantiate a new child process instance. @@ -51,8 +53,8 @@ def get_subprocess( def subprocess_started( config: Config, target: Callable[..., None], - sockets: List[socket], - stdin_fileno: Optional[int], + sockets: list[socket], + stdin_fileno: int | None, ) -> None: """ Called when the child process starts. diff --git a/uvicorn/_types.py b/uvicorn/_types.py index 2f689d960..3a510d0ac 100644 --- a/uvicorn/_types.py +++ b/uvicorn/_types.py @@ -27,6 +27,7 @@ (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ +from __future__ import annotations import sys import types @@ -34,7 +35,6 @@ Any, Awaitable, Callable, - Dict, Iterable, MutableMapping, Optional, @@ -63,7 +63,7 @@ # ASGI class ASGIVersions(TypedDict): spec_version: str - version: Union[Literal["2.0"], Literal["3.0"]] + version: Literal["2.0"] | Literal["3.0"] class HTTPScope(TypedDict): @@ -76,11 +76,11 @@ class HTTPScope(TypedDict): raw_path: bytes query_string: bytes root_path: str - headers: Iterable[Tuple[bytes, bytes]] - client: Optional[Tuple[str, int]] - server: Optional[Tuple[str, Optional[int]]] - state: NotRequired[Dict[str, Any]] - extensions: NotRequired[Dict[str, Dict[object, object]]] + headers: Iterable[tuple[bytes, bytes]] + client: tuple[str, int] | None + server: tuple[str, int | None] | None + state: NotRequired[dict[str, Any]] + extensions: NotRequired[dict[str, dict[object, object]]] class WebSocketScope(TypedDict): @@ -92,18 +92,18 @@ class WebSocketScope(TypedDict): raw_path: bytes query_string: bytes root_path: str - headers: Iterable[Tuple[bytes, bytes]] - client: Optional[Tuple[str, int]] - server: Optional[Tuple[str, Optional[int]]] + headers: Iterable[tuple[bytes, bytes]] + client: tuple[str, int] | None + server: tuple[str, int | None] | None subprotocols: Iterable[str] - state: NotRequired[Dict[str, Any]] - extensions: NotRequired[Dict[str, Dict[object, object]]] + state: NotRequired[dict[str, Any]] + extensions: NotRequired[dict[str, dict[object, object]]] class LifespanScope(TypedDict): type: Literal["lifespan"] asgi: ASGIVersions - state: NotRequired[Dict[str, Any]] + state: NotRequired[dict[str, Any]] WWWScope = Union[HTTPScope, WebSocketScope] @@ -118,13 +118,13 @@ class HTTPRequestEvent(TypedDict): class HTTPResponseDebugEvent(TypedDict): type: Literal["http.response.debug"] - info: Dict[str, object] + info: dict[str, object] class HTTPResponseStartEvent(TypedDict): type: Literal["http.response.start"] status: int - headers: NotRequired[Iterable[Tuple[bytes, bytes]]] + headers: NotRequired[Iterable[tuple[bytes, bytes]]] trailers: NotRequired[bool] @@ -136,14 +136,14 @@ class HTTPResponseBodyEvent(TypedDict): class HTTPResponseTrailersEvent(TypedDict): type: Literal["http.response.trailers"] - headers: Iterable[Tuple[bytes, bytes]] + headers: Iterable[tuple[bytes, bytes]] more_trailers: bool class HTTPServerPushEvent(TypedDict): type: Literal["http.response.push"] path: str - headers: Iterable[Tuple[bytes, bytes]] + headers: Iterable[tuple[bytes, bytes]] class HTTPDisconnectEvent(TypedDict): @@ -156,8 +156,8 @@ class WebSocketConnectEvent(TypedDict): class WebSocketAcceptEvent(TypedDict): type: Literal["websocket.accept"] - subprotocol: NotRequired[Optional[str]] - headers: NotRequired[Iterable[Tuple[bytes, bytes]]] + subprotocol: NotRequired[str | None] + headers: NotRequired[Iterable[tuple[bytes, bytes]]] class _WebSocketReceiveEventBytes(TypedDict): @@ -193,7 +193,7 @@ class _WebSocketSendEventText(TypedDict): class WebSocketResponseStartEvent(TypedDict): type: Literal["websocket.http.response.start"] status: int - headers: Iterable[Tuple[bytes, bytes]] + headers: Iterable[tuple[bytes, bytes]] class WebSocketResponseBodyEvent(TypedDict): @@ -210,7 +210,7 @@ class WebSocketDisconnectEvent(TypedDict): class WebSocketCloseEvent(TypedDict): type: Literal["websocket.close"] code: NotRequired[int] - reason: NotRequired[Optional[str]] + reason: NotRequired[str | None] class LifespanStartupEvent(TypedDict): diff --git a/uvicorn/config.py b/uvicorn/config.py index 45b68e35f..b0dff4604 100644 --- a/uvicorn/config.py +++ b/uvicorn/config.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import inspect import json @@ -8,18 +10,7 @@ import ssl import sys from pathlib import Path -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Literal, - Optional, - Tuple, - Type, - Union, -) +from typing import Any, Awaitable, Callable, Literal import click @@ -37,7 +28,7 @@ LoopSetupType = Literal["none", "auto", "asyncio", "uvloop"] InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"] -LOG_LEVELS: Dict[str, int] = { +LOG_LEVELS: dict[str, int] = { "critical": logging.CRITICAL, "error": logging.ERROR, "warning": logging.WARNING, @@ -45,33 +36,33 @@ "debug": logging.DEBUG, "trace": TRACE_LOG_LEVEL, } -HTTP_PROTOCOLS: Dict[HTTPProtocolType, str] = { +HTTP_PROTOCOLS: dict[HTTPProtocolType, str] = { "auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol", "h11": "uvicorn.protocols.http.h11_impl:H11Protocol", "httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol", } -WS_PROTOCOLS: Dict[WSProtocolType, Optional[str]] = { +WS_PROTOCOLS: dict[WSProtocolType, str | None] = { "auto": "uvicorn.protocols.websockets.auto:AutoWebSocketsProtocol", "none": None, "websockets": "uvicorn.protocols.websockets.websockets_impl:WebSocketProtocol", "wsproto": "uvicorn.protocols.websockets.wsproto_impl:WSProtocol", } -LIFESPAN: Dict[LifespanType, str] = { +LIFESPAN: dict[LifespanType, str] = { "auto": "uvicorn.lifespan.on:LifespanOn", "on": "uvicorn.lifespan.on:LifespanOn", "off": "uvicorn.lifespan.off:LifespanOff", } -LOOP_SETUPS: Dict[LoopSetupType, Optional[str]] = { +LOOP_SETUPS: dict[LoopSetupType, str | None] = { "none": None, "auto": "uvicorn.loops.auto:auto_loop_setup", "asyncio": "uvicorn.loops.asyncio:asyncio_setup", "uvloop": "uvicorn.loops.uvloop:uvloop_setup", } -INTERFACES: List[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"] +INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"] SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER -LOGGING_CONFIG: Dict[str, Any] = { +LOGGING_CONFIG: dict[str, Any] = { "version": 1, "disable_existing_loggers": False, "formatters": { @@ -108,13 +99,13 @@ def create_ssl_context( - certfile: Union[str, os.PathLike], - keyfile: Optional[Union[str, os.PathLike]], - password: Optional[str], + certfile: str | os.PathLike[str], + keyfile: str | os.PathLike[str] | None, + password: str | None, ssl_version: int, cert_reqs: int, - ca_certs: Optional[Union[str, os.PathLike]], - ciphers: Optional[str], + ca_certs: str | os.PathLike[str] | None, + ciphers: str | None, ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl_version) get_password = (lambda: password) if password else None @@ -137,10 +128,10 @@ def is_dir(path: Path) -> bool: def resolve_reload_patterns( - patterns_list: List[str], directories_list: List[str] -) -> Tuple[List[str], List[Path]]: - directories: List[Path] = list(set(map(Path, directories_list.copy()))) - patterns: List[str] = patterns_list.copy() + patterns_list: list[str], directories_list: list[str] +) -> tuple[list[str], list[Path]]: + directories: list[Path] = list(set(map(Path, directories_list.copy()))) + patterns: list[str] = patterns_list.copy() current_working_directory = Path.cwd() for pattern in patterns_list: @@ -176,7 +167,7 @@ def resolve_reload_patterns( return list(set(patterns)), directories -def _normalize_dirs(dirs: Union[List[str], str, None]) -> List[str]: +def _normalize_dirs(dirs: list[str] | str | None) -> list[str]: if dirs is None: return [] if isinstance(dirs, str): @@ -187,54 +178,54 @@ def _normalize_dirs(dirs: Union[List[str], str, None]) -> List[str]: class Config: def __init__( self, - app: Union["ASGIApplication", Callable[..., Any], str], + app: ASGIApplication | Callable[..., Any] | str, host: str = "127.0.0.1", port: int = 8000, - uds: Optional[str] = None, - fd: Optional[int] = None, + uds: str | None = None, + fd: int | None = None, loop: LoopSetupType = "auto", - http: Union[Type[asyncio.Protocol], HTTPProtocolType] = "auto", - ws: Union[Type[asyncio.Protocol], WSProtocolType] = "auto", + http: type[asyncio.Protocol] | HTTPProtocolType = "auto", + ws: type[asyncio.Protocol] | WSProtocolType = "auto", ws_max_size: int = 16 * 1024 * 1024, ws_max_queue: int = 32, - ws_ping_interval: Optional[float] = 20.0, - ws_ping_timeout: Optional[float] = 20.0, + ws_ping_interval: float | None = 20.0, + ws_ping_timeout: float | None = 20.0, ws_per_message_deflate: bool = True, lifespan: LifespanType = "auto", - env_file: "str | os.PathLike[str] | None" = None, - log_config: Optional[Union[Dict[str, Any], str]] = LOGGING_CONFIG, - log_level: Optional[Union[str, int]] = None, + env_file: str | os.PathLike[str] | None = None, + log_config: dict[str, Any] | str | None = LOGGING_CONFIG, + log_level: str | int | None = None, access_log: bool = True, - use_colors: Optional[bool] = None, + use_colors: bool | None = None, interface: InterfaceType = "auto", reload: bool = False, - reload_dirs: Optional[Union[List[str], str]] = None, + reload_dirs: list[str] | str | None = None, reload_delay: float = 0.25, - reload_includes: Optional[Union[List[str], str]] = None, - reload_excludes: Optional[Union[List[str], str]] = None, - workers: Optional[int] = None, + reload_includes: list[str] | str | None = None, + reload_excludes: list[str] | str | None = None, + workers: int | None = None, proxy_headers: bool = True, server_header: bool = True, date_header: bool = True, - forwarded_allow_ips: Optional[Union[List[str], str]] = None, + forwarded_allow_ips: list[str] | str | None = None, root_path: str = "", - limit_concurrency: Optional[int] = None, - limit_max_requests: Optional[int] = None, + limit_concurrency: int | None = None, + limit_max_requests: int | None = None, backlog: int = 2048, timeout_keep_alive: int = 5, timeout_notify: int = 30, - timeout_graceful_shutdown: Optional[int] = None, - callback_notify: Optional[Callable[..., Awaitable[None]]] = None, - ssl_keyfile: Optional[str] = None, - ssl_certfile: "str | os.PathLike[str] | None" = None, - ssl_keyfile_password: Optional[str] = None, + timeout_graceful_shutdown: int | None = None, + callback_notify: Callable[..., Awaitable[None]] | None = None, + ssl_keyfile: str | None = None, + ssl_certfile: str | os.PathLike[str] | None = None, + ssl_keyfile_password: str | None = None, ssl_version: int = SSL_PROTOCOL_VERSION, ssl_cert_reqs: int = ssl.CERT_NONE, - ssl_ca_certs: Optional[str] = None, + ssl_ca_certs: str | None = None, ssl_ciphers: str = "TLSv1", - headers: Optional[List[Tuple[str, str]]] = None, + headers: list[tuple[str, str]] | None = None, factory: bool = False, - h11_max_incomplete_event_size: Optional[int] = None, + h11_max_incomplete_event_size: int | None = None, ): self.app = app self.host = host @@ -276,18 +267,18 @@ def __init__( self.ssl_cert_reqs = ssl_cert_reqs self.ssl_ca_certs = ssl_ca_certs self.ssl_ciphers = ssl_ciphers - self.headers: List[Tuple[str, str]] = headers or [] - self.encoded_headers: List[Tuple[bytes, bytes]] = [] + self.headers: list[tuple[str, str]] = headers or [] + self.encoded_headers: list[tuple[bytes, bytes]] = [] self.factory = factory self.h11_max_incomplete_event_size = h11_max_incomplete_event_size self.loaded = False self.configure_logging() - self.reload_dirs: List[Path] = [] - self.reload_dirs_excludes: List[Path] = [] - self.reload_includes: List[str] = [] - self.reload_excludes: List[str] = [] + self.reload_dirs: list[Path] = [] + self.reload_dirs_excludes: list[Path] = [] + self.reload_includes: list[str] = [] + self.reload_excludes: list[str] = [] if ( reload_dirs or reload_includes or reload_excludes @@ -350,7 +341,7 @@ def __init__( if workers is None and "WEB_CONCURRENCY" in os.environ: self.workers = int(os.environ["WEB_CONCURRENCY"]) - self.forwarded_allow_ips: Union[List[str], str] + self.forwarded_allow_ips: list[str] | str if forwarded_allow_ips is None: self.forwarded_allow_ips = os.environ.get( "FORWARDED_ALLOW_IPS", "127.0.0.1" @@ -363,7 +354,7 @@ def __init__( @property def asgi_version(self) -> Literal["2.0", "3.0"]: - mapping: Dict[str, Literal["2.0", "3.0"]] = { + mapping: dict[str, Literal["2.0", "3.0"]] = { "asgi2": "2.0", "asgi3": "3.0", "wsgi": "3.0", @@ -427,7 +418,7 @@ def load(self) -> None: if self.is_ssl: assert self.ssl_certfile - self.ssl: Optional[ssl.SSLContext] = create_ssl_context( + self.ssl: ssl.SSLContext | None = create_ssl_context( keyfile=self.ssl_keyfile, certfile=self.ssl_certfile, password=self.ssl_keyfile_password, @@ -451,13 +442,13 @@ def load(self) -> None: if isinstance(self.http, str): http_protocol_class = import_from_string(HTTP_PROTOCOLS[self.http]) - self.http_protocol_class: Type[asyncio.Protocol] = http_protocol_class + self.http_protocol_class: type[asyncio.Protocol] = http_protocol_class else: self.http_protocol_class = self.http if isinstance(self.ws, str): ws_protocol_class = import_from_string(WS_PROTOCOLS[self.ws]) - self.ws_protocol_class: Optional[Type[asyncio.Protocol]] = ws_protocol_class + self.ws_protocol_class: type[asyncio.Protocol] | None = ws_protocol_class else: self.ws_protocol_class = self.ws @@ -508,12 +499,12 @@ def load(self) -> None: self.loaded = True def setup_event_loop(self) -> None: - loop_setup: Optional[Callable] = import_from_string(LOOP_SETUPS[self.loop]) + loop_setup: Callable | None = import_from_string(LOOP_SETUPS[self.loop]) if loop_setup is not None: loop_setup(use_subprocess=self.use_subprocess) def bind_socket(self) -> socket.socket: - logger_args: List[Union[str, int]] + logger_args: list[str | int] if self.uds: # pragma: py-win32 path = self.uds sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) diff --git a/uvicorn/logging.py b/uvicorn/logging.py index c88d5fe55..74e864ed3 100644 --- a/uvicorn/logging.py +++ b/uvicorn/logging.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import http import logging import sys from copy import copy -from typing import Literal, Optional +from typing import Literal import click @@ -31,10 +33,10 @@ class ColourizedFormatter(logging.Formatter): def __init__( self, - fmt: Optional[str] = None, - datefmt: Optional[str] = None, + fmt: str | None = None, + datefmt: str | None = None, style: Literal["%", "{", "$"] = "%", - use_colors: Optional[bool] = None, + use_colors: bool | None = None, ): if use_colors in (True, False): self.use_colors = use_colors diff --git a/uvicorn/main.py b/uvicorn/main.py index 3288fbd9b..fee6c5b4c 100644 --- a/uvicorn/main.py +++ b/uvicorn/main.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import asyncio import logging import os import platform import ssl import sys -import typing +from typing import Any, Callable import click @@ -381,9 +383,9 @@ def main( lifespan: LifespanType, interface: InterfaceType, reload: bool, - reload_dirs: typing.List[str], - reload_includes: typing.List[str], - reload_excludes: typing.List[str], + reload_dirs: list[str], + reload_includes: list[str], + reload_excludes: list[str], reload_delay: float, workers: int, env_file: str, @@ -399,7 +401,7 @@ def main( backlog: int, limit_max_requests: int, timeout_keep_alive: int, - timeout_graceful_shutdown: typing.Optional[int], + timeout_graceful_shutdown: int | None, ssl_keyfile: str, ssl_certfile: str, ssl_keyfile_password: str, @@ -407,10 +409,10 @@ def main( ssl_cert_reqs: int, ssl_ca_certs: str, ssl_ciphers: str, - headers: typing.List[str], + headers: list[str], use_colors: bool, app_dir: str, - h11_max_incomplete_event_size: typing.Optional[int], + h11_max_incomplete_event_size: int | None, factory: bool, ) -> None: run( @@ -465,56 +467,54 @@ def main( def run( - app: typing.Union["ASGIApplication", typing.Callable[..., typing.Any], str], + app: ASGIApplication | Callable[..., Any] | str, *, host: str = "127.0.0.1", port: int = 8000, - uds: typing.Optional[str] = None, - fd: typing.Optional[int] = None, + uds: str | None = None, + fd: int | None = None, loop: LoopSetupType = "auto", - http: typing.Union[typing.Type[asyncio.Protocol], HTTPProtocolType] = "auto", - ws: typing.Union[typing.Type[asyncio.Protocol], WSProtocolType] = "auto", + http: type[asyncio.Protocol] | HTTPProtocolType = "auto", + ws: type[asyncio.Protocol] | WSProtocolType = "auto", ws_max_size: int = 16777216, ws_max_queue: int = 32, - ws_ping_interval: typing.Optional[float] = 20.0, - ws_ping_timeout: typing.Optional[float] = 20.0, + ws_ping_interval: float | None = 20.0, + ws_ping_timeout: float | None = 20.0, ws_per_message_deflate: bool = True, lifespan: LifespanType = "auto", interface: InterfaceType = "auto", reload: bool = False, - reload_dirs: typing.Optional[typing.Union[typing.List[str], str]] = None, - reload_includes: typing.Optional[typing.Union[typing.List[str], str]] = None, - reload_excludes: typing.Optional[typing.Union[typing.List[str], str]] = None, + reload_dirs: list[str] | str | None = None, + reload_includes: list[str] | str | None = None, + reload_excludes: list[str] | str | None = None, reload_delay: float = 0.25, - workers: typing.Optional[int] = None, - env_file: "str | os.PathLike[str] | None" = None, - log_config: typing.Optional[ - typing.Union[typing.Dict[str, typing.Any], str] - ] = LOGGING_CONFIG, - log_level: typing.Optional[typing.Union[str, int]] = None, + workers: int | None = None, + env_file: str | os.PathLike[str] | None = None, + log_config: dict[str, Any] | str | None = LOGGING_CONFIG, + log_level: str | int | None = None, access_log: bool = True, proxy_headers: bool = True, server_header: bool = True, date_header: bool = True, - forwarded_allow_ips: typing.Optional[typing.Union[typing.List[str], str]] = None, + forwarded_allow_ips: list[str] | str | None = None, root_path: str = "", - limit_concurrency: typing.Optional[int] = None, + limit_concurrency: int | None = None, backlog: int = 2048, - limit_max_requests: typing.Optional[int] = None, + limit_max_requests: int | None = None, timeout_keep_alive: int = 5, - timeout_graceful_shutdown: typing.Optional[int] = None, - ssl_keyfile: typing.Optional[str] = None, - ssl_certfile: "str | os.PathLike[str] | None" = None, - ssl_keyfile_password: typing.Optional[str] = None, + timeout_graceful_shutdown: int | None = None, + ssl_keyfile: str | None = None, + ssl_certfile: str | os.PathLike[str] | None = None, + ssl_keyfile_password: str | None = None, ssl_version: int = SSL_PROTOCOL_VERSION, ssl_cert_reqs: int = ssl.CERT_NONE, - ssl_ca_certs: typing.Optional[str] = None, + ssl_ca_certs: str | None = None, ssl_ciphers: str = "TLSv1", - headers: typing.Optional[typing.List[typing.Tuple[str, str]]] = None, - use_colors: typing.Optional[bool] = None, - app_dir: typing.Optional[str] = None, + headers: list[tuple[str, str]] | None = None, + use_colors: bool | None = None, + app_dir: str | None = None, factory: bool = False, - h11_max_incomplete_event_size: typing.Optional[int] = None, + h11_max_incomplete_event_size: int | None = None, ) -> None: if app_dir is not None: sys.path.insert(0, app_dir) diff --git a/uvicorn/protocols/http/auto.py b/uvicorn/protocols/http/auto.py index 1aa996744..a14bec144 100644 --- a/uvicorn/protocols/http/auto.py +++ b/uvicorn/protocols/http/auto.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import asyncio -from typing import Type -AutoHTTPProtocol: Type[asyncio.Protocol] +AutoHTTPProtocol: type[asyncio.Protocol] try: import httptools # noqa except ImportError: # pragma: no cover diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index 0c0efea0a..c6b2f2781 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -1,16 +1,9 @@ +from __future__ import annotations + import asyncio import http import logging -from typing import ( - Any, - Callable, - Dict, - List, - Literal, - Optional, - Tuple, - cast, -) +from typing import Any, Callable, Literal, cast from urllib.parse import unquote import h11 @@ -60,8 +53,8 @@ def __init__( self, config: Config, server_state: ServerState, - app_state: Dict[str, Any], - _loop: Optional[asyncio.AbstractEventLoop] = None, + app_state: dict[str, Any], + _loop: asyncio.AbstractEventLoop | None = None, ) -> None: if not config.loaded: config.load() @@ -84,7 +77,7 @@ def __init__( self.app_state = app_state # Timeouts - self.timeout_keep_alive_task: Optional[asyncio.TimerHandle] = None + self.timeout_keep_alive_task: asyncio.TimerHandle | None = None self.timeout_keep_alive = config.timeout_keep_alive # Shared server state @@ -95,13 +88,13 @@ def __init__( # Per-connection state self.transport: asyncio.Transport = None # type: ignore[assignment] self.flow: FlowControl = None # type: ignore[assignment] - self.server: Optional[Tuple[str, int]] = None - self.client: Optional[Tuple[str, int]] = None - self.scheme: Optional[Literal["http", "https"]] = None + self.server: tuple[str, int] | None = None + self.client: tuple[str, int] | None = None + self.scheme: Literal["http", "https"] | None = None # Per-request state self.scope: HTTPScope = None # type: ignore[assignment] - self.headers: List[Tuple[bytes, bytes]] = None # type: ignore[assignment] + self.headers: list[tuple[bytes, bytes]] = None # type: ignore[assignment] self.cycle: RequestResponseCycle = None # type: ignore[assignment] # Protocol interface @@ -120,7 +113,7 @@ def connection_made( # type: ignore[override] prefix = "%s:%d - " % self.client if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix) - def connection_lost(self, exc: Optional[Exception]) -> None: + def connection_lost(self, exc: Exception | None) -> None: self.connections.discard(self) if self.logger.level <= TRACE_LOG_LEVEL: @@ -153,7 +146,7 @@ def _unset_keepalive_if_required(self) -> None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None - def _get_upgrade(self) -> Optional[bytes]: + def _get_upgrade(self) -> bytes | None: connection = [] upgrade = None for name, value in self.headers: @@ -293,7 +286,7 @@ def handle_websocket_upgrade(self, event: h11.Request) -> None: def send_400_response(self, msg: str) -> None: reason = STATUS_PHRASES[400] - headers: List[Tuple[bytes, bytes]] = [ + headers: list[tuple[bytes, bytes]] = [ (b"content-type", b"text/plain; charset=utf-8"), (b"connection", b"close"), ] @@ -374,7 +367,7 @@ def __init__( logger: logging.Logger, access_logger: logging.Logger, access_log: bool, - default_headers: List[Tuple[bytes, bytes]], + default_headers: list[tuple[bytes, bytes]], message_event: asyncio.Event, on_response: Callable[..., None], ) -> None: @@ -525,7 +518,7 @@ async def send(self, message: "ASGISendEvent") -> None: async def receive(self) -> "ASGIReceiveEvent": if self.waiting_for_100_continue and not self.transport.is_closing(): - headers: List[Tuple[str, str]] = [] + headers: list[tuple[str, str]] = [] event = h11.InformationalResponse( status_code=100, headers=headers, reason="Continue" ) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 8d1a0543c..99c868c31 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import http import logging @@ -5,18 +7,7 @@ import urllib from asyncio.events import TimerHandle from collections import deque -from typing import ( - Any, - Callable, - Deque, - Dict, - List, - Literal, - Optional, - Tuple, - Union, - cast, -) +from typing import Any, Callable, Deque, Literal, cast import httptools @@ -69,8 +60,8 @@ def __init__( self, config: Config, server_state: ServerState, - app_state: Dict[str, Any], - _loop: Optional[asyncio.AbstractEventLoop] = None, + app_state: dict[str, Any], + _loop: asyncio.AbstractEventLoop | None = None, ) -> None: if not config.loaded: config.load() @@ -88,7 +79,7 @@ def __init__( self.app_state = app_state # Timeouts - self.timeout_keep_alive_task: Optional[TimerHandle] = None + self.timeout_keep_alive_task: TimerHandle | None = None self.timeout_keep_alive = config.timeout_keep_alive # Global state @@ -99,14 +90,14 @@ def __init__( # Per-connection state self.transport: asyncio.Transport = None # type: ignore[assignment] self.flow: FlowControl = None # type: ignore[assignment] - self.server: Optional[Tuple[str, int]] = None - self.client: Optional[Tuple[str, int]] = None - self.scheme: Optional[Literal["http", "https"]] = None - self.pipeline: Deque[Tuple[RequestResponseCycle, ASGI3Application]] = deque() + self.server: tuple[str, int] | None = None + self.client: tuple[str, int] | None = None + self.scheme: Literal["http", "https"] | None = None + self.pipeline: Deque[tuple[RequestResponseCycle, ASGI3Application]] = deque() # Per-request state self.scope: HTTPScope = None # type: ignore[assignment] - self.headers: List[Tuple[bytes, bytes]] = None # type: ignore[assignment] + self.headers: list[tuple[bytes, bytes]] = None # type: ignore[assignment] self.expect_100_continue = False self.cycle: RequestResponseCycle = None # type: ignore[assignment] @@ -126,7 +117,7 @@ def connection_made( # type: ignore[override] prefix = "%s:%d - " % self.client if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix) - def connection_lost(self, exc: Optional[Exception]) -> None: + def connection_lost(self, exc: Exception | None) -> None: self.connections.discard(self) if self.logger.level <= TRACE_LOG_LEVEL: @@ -153,7 +144,7 @@ def _unset_keepalive_if_required(self) -> None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None - def _get_upgrade(self) -> Optional[bytes]: + def _get_upgrade(self) -> bytes | None: connection = [] upgrade = None for name, value in self.headers: @@ -165,7 +156,7 @@ def _get_upgrade(self) -> Optional[bytes]: return upgrade return None - def _should_upgrade_to_ws(self, upgrade: Optional[bytes]) -> bool: + def _should_upgrade_to_ws(self, upgrade: bytes | None) -> bool: if upgrade == b"websocket" and self.ws_protocol_class is not None: return True if self.config.ws == "auto": @@ -389,7 +380,7 @@ def __init__( logger: logging.Logger, access_logger: logging.Logger, access_log: bool, - default_headers: List[Tuple[bytes, bytes]], + default_headers: list[tuple[bytes, bytes]], message_event: asyncio.Event, expect_100_continue: bool, keep_alive: bool, @@ -417,7 +408,7 @@ def __init__( # Response state self.response_started = False self.response_complete = False - self.chunked_encoding: Optional[bool] = None + self.chunked_encoding: bool | None = None self.expected_content_length = 0 # ASGI exception wrapper @@ -587,7 +578,7 @@ async def receive(self) -> "ASGIReceiveEvent": await self.message_event.wait() self.message_event.clear() - message: "Union[HTTPDisconnectEvent, HTTPRequestEvent]" + message: HTTPDisconnectEvent | HTTPRequestEvent if self.disconnected or self.response_complete: message = {"type": "http.disconnect"} else: diff --git a/uvicorn/protocols/utils.py b/uvicorn/protocols/utils.py index d0697fe73..7d6cbb864 100644 --- a/uvicorn/protocols/utils.py +++ b/uvicorn/protocols/utils.py @@ -1,11 +1,12 @@ +from __future__ import annotations + import asyncio import urllib.parse -from typing import Optional, Tuple from uvicorn._types import WWWScope -def get_remote_addr(transport: asyncio.Transport) -> Optional[Tuple[str, int]]: +def get_remote_addr(transport: asyncio.Transport) -> tuple[str, int] | None: socket_info = transport.get_extra_info("socket") if socket_info is not None: try: @@ -22,7 +23,7 @@ def get_remote_addr(transport: asyncio.Transport) -> Optional[Tuple[str, int]]: return None -def get_local_addr(transport: asyncio.Transport) -> Optional[Tuple[str, int]]: +def get_local_addr(transport: asyncio.Transport) -> tuple[str, int] | None: socket_info = transport.get_extra_info("socket") if socket_info is not None: info = socket_info.getsockname() @@ -38,14 +39,14 @@ def is_ssl(transport: asyncio.Transport) -> bool: return bool(transport.get_extra_info("sslcontext")) -def get_client_addr(scope: "WWWScope") -> str: +def get_client_addr(scope: WWWScope) -> str: client = scope.get("client") if not client: return "" return "%s:%d" % client -def get_path_with_query_string(scope: "WWWScope") -> str: +def get_path_with_query_string(scope: WWWScope) -> str: path_with_query_string = urllib.parse.quote(scope["path"]) if scope["query_string"]: path_with_query_string = "{}?{}".format( diff --git a/uvicorn/protocols/websockets/websockets_impl.py b/uvicorn/protocols/websockets/websockets_impl.py index d22f3c539..3f04c1dd5 100644 --- a/uvicorn/protocols/websockets/websockets_impl.py +++ b/uvicorn/protocols/websockets/websockets_impl.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import asyncio import http import logging from typing import ( Any, - Dict, List, Literal, Optional, @@ -65,8 +66,8 @@ def __init__( self, config: Config, server_state: ServerState, - app_state: Dict[str, Any], - _loop: Optional[asyncio.AbstractEventLoop] = None, + app_state: dict[str, Any], + _loop: asyncio.AbstractEventLoop | None = None, ): if not config.loaded: config.load() @@ -83,19 +84,19 @@ def __init__( # Connection state self.transport: asyncio.Transport = None # type: ignore[assignment] - self.server: Optional[Tuple[str, int]] = None - self.client: Optional[Tuple[str, int]] = None + self.server: tuple[str, int] | None = None + self.client: tuple[str, int] | None = None self.scheme: Literal["wss", "ws"] = None # type: ignore[assignment] # Connection events - self.scope: WebSocketScope = None # type: ignore[assignment] + self.scope: WebSocketScope self.handshake_started_event = asyncio.Event() self.handshake_completed_event = asyncio.Event() self.closed_event = asyncio.Event() - self.initial_response: Optional[HTTPResponse] = None + self.initial_response: HTTPResponse | None = None self.connect_sent = False self.lost_connection_before_handshake = False - self.accepted_subprotocol: Optional[Subprotocol] = None + self.accepted_subprotocol: Subprotocol | None = None self.ws_server: Server = Server() # type: ignore[assignment] diff --git a/uvicorn/protocols/websockets/wsproto_impl.py b/uvicorn/protocols/websockets/wsproto_impl.py index db95f57a3..7929f3a91 100644 --- a/uvicorn/protocols/websockets/wsproto_impl.py +++ b/uvicorn/protocols/websockets/wsproto_impl.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging import typing @@ -36,8 +38,8 @@ def __init__( self, config: Config, server_state: ServerState, - app_state: typing.Dict[str, typing.Any], - _loop: typing.Optional[asyncio.AbstractEventLoop] = None, + app_state: dict[str, typing.Any], + _loop: asyncio.AbstractEventLoop | None = None, ) -> None: if not config.loaded: config.load() @@ -56,12 +58,12 @@ def __init__( # Connection state self.transport: asyncio.Transport = None # type: ignore[assignment] - self.server: typing.Optional[typing.Tuple[str, int]] = None - self.client: typing.Optional[typing.Tuple[str, int]] = None + self.server: tuple[str, int] | None = None + self.client: tuple[str, int] | None = None self.scheme: Literal["wss", "ws"] = None # type: ignore[assignment] # WebSocket state - self.queue: asyncio.Queue["WebSocketEvent"] = asyncio.Queue() + self.queue: asyncio.Queue[WebSocketEvent] = asyncio.Queue() self.handshake_complete = False self.close_sent = False @@ -93,7 +95,7 @@ def connection_made( # type: ignore[override] prefix = "%s:%d - " % self.client if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix) - def connection_lost(self, exc: typing.Optional[Exception]) -> None: + def connection_lost(self, exc: Exception | None) -> None: code = 1005 if self.handshake_complete else 1006 self.queue.put_nowait({"type": "websocket.disconnect", "code": code}) self.connections.remove(self) @@ -246,14 +248,14 @@ async def run_asgi(self) -> None: self.logger.error(msg, result) self.transport.close() - async def send(self, message: "ASGISendEvent") -> None: + async def send(self, message: ASGISendEvent) -> None: await self.writable.wait() message_type = message["type"] if not self.handshake_complete: if message_type == "websocket.accept": - message = typing.cast("WebSocketAcceptEvent", message) + message = typing.cast(WebSocketAcceptEvent, message) self.logger.info( '%s - "WebSocket %s" [accepted]', self.scope["client"], @@ -290,7 +292,7 @@ async def send(self, message: "ASGISendEvent") -> None: self.transport.close() elif message_type == "websocket.http.response.start": - message = typing.cast("WebSocketResponseStartEvent", message) + message = typing.cast(WebSocketResponseStartEvent, message) # ensure status code is in the valid range if not (100 <= message["status"] < 600): msg = "Invalid HTTP status code '%d' in response." @@ -321,7 +323,7 @@ async def send(self, message: "ASGISendEvent") -> None: elif not self.close_sent and not self.response_started: if message_type == "websocket.send": - message = typing.cast("WebSocketSendEvent", message) + message = typing.cast(WebSocketSendEvent, message) bytes_data = message.get("bytes") text_data = message.get("text") data = text_data if bytes_data is None else bytes_data @@ -332,7 +334,7 @@ async def send(self, message: "ASGISendEvent") -> None: self.transport.write(output) elif message_type == "websocket.close": - message = typing.cast("WebSocketCloseEvent", message) + message = typing.cast(WebSocketCloseEvent, message) self.close_sent = True code = message.get("code", 1000) reason = message.get("reason", "") or "" @@ -378,7 +380,7 @@ async def send(self, message: "ASGISendEvent") -> None: msg = "Unexpected ASGI message '%s', after sending 'websocket.close'." raise RuntimeError(msg % message_type) - async def receive(self) -> "WebSocketEvent": + async def receive(self) -> WebSocketEvent: message = await self.queue.get() if self.read_paused and self.queue.empty(): self.read_paused = False diff --git a/uvicorn/server.py b/uvicorn/server.py index 9e904b269..1f0b726f8 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging import os @@ -9,7 +11,7 @@ import time from email.utils import formatdate from types import FrameType -from typing import TYPE_CHECKING, List, Optional, Sequence, Set, Tuple, Union +from typing import TYPE_CHECKING, Sequence, Union import click @@ -23,7 +25,6 @@ Protocols = Union[H11Protocol, HttpToolsProtocol, WSProtocol, WebSocketProtocol] - HANDLED_SIGNALS = ( signal.SIGINT, # Unix signal 2. Sent by Ctrl+C. signal.SIGTERM, # Unix signal 15. Sent by `kill `. @@ -41,9 +42,9 @@ class ServerState: def __init__(self) -> None: self.total_requests = 0 - self.connections: Set["Protocols"] = set() - self.tasks: Set[asyncio.Task] = set() - self.default_headers: List[Tuple[bytes, bytes]] = [] + self.connections: set[Protocols] = set() + self.tasks: set[asyncio.Task[None]] = set() + self.default_headers: list[tuple[bytes, bytes]] = [] class Server: @@ -56,11 +57,11 @@ def __init__(self, config: Config) -> None: self.force_exit = False self.last_notified = 0.0 - def run(self, sockets: Optional[List[socket.socket]] = None) -> None: + def run(self, sockets: list[socket.socket] | None = None) -> None: self.config.setup_event_loop() return asyncio.run(self.serve(sockets=sockets)) - async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def serve(self, sockets: list[socket.socket] | None = None) -> None: process_id = os.getpid() config = self.config @@ -85,7 +86,7 @@ async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None: color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]" logger.info(message, process_id, extra={"color_message": color_message}) - async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def startup(self, sockets: list[socket.socket] | None = None) -> None: await self.lifespan.startup() if self.lifespan.should_exit: self.should_exit = True @@ -94,7 +95,7 @@ async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: config = self.config def create_protocol( - _loop: Optional[asyncio.AbstractEventLoop] = None, + _loop: asyncio.AbstractEventLoop | None = None, ) -> asyncio.Protocol: return config.http_protocol_class( # type: ignore[call-arg] config=config, @@ -120,7 +121,7 @@ def _share_socket( sock_data = sock.share(os.getpid()) # type: ignore[attr-defined] return fromshare(sock_data) - self.servers: List[asyncio.base_events.Server] = [] + self.servers: list[asyncio.base_events.Server] = [] for sock in sockets: is_windows = platform.system() == "Windows" if config.workers > 1 and is_windows: # pragma: py-not-win32 @@ -260,7 +261,7 @@ async def on_tick(self, counter: int) -> bool: return self.server_state.total_requests >= self.config.limit_max_requests return False - async def shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None: + async def shutdown(self, sockets: list[socket.socket] | None = None) -> None: logger.info("Shutting down") # Stop accepting new connections. @@ -328,7 +329,7 @@ def install_signal_handlers(self) -> None: for sig in HANDLED_SIGNALS: signal.signal(sig, self.handle_exit) - def handle_exit(self, sig: int, frame: Optional[FrameType]) -> None: + def handle_exit(self, sig: int, frame: FrameType | None) -> None: if self.should_exit and sig == signal.SIGINT: self.force_exit = True else: diff --git a/uvicorn/supervisors/basereload.py b/uvicorn/supervisors/basereload.py index 0c1dc25ad..6e2e0c359 100644 --- a/uvicorn/supervisors/basereload.py +++ b/uvicorn/supervisors/basereload.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import os import signal @@ -6,7 +8,7 @@ from pathlib import Path from socket import socket from types import FrameType -from typing import Callable, Iterator, List, Optional +from typing import Callable, Iterator import click @@ -25,8 +27,8 @@ class BaseReload: def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: self.config = config self.target = target @@ -34,9 +36,9 @@ def __init__( self.should_exit = threading.Event() self.pid = os.getpid() self.is_restarting = False - self.reloader_name: Optional[str] = None + self.reloader_name: str | None = None - def signal_handler(self, sig: int, frame: Optional[FrameType]) -> None: + def signal_handler(self, sig: int, frame: FrameType | None) -> None: """ A signal handler that is registered with the parent process. """ @@ -62,10 +64,10 @@ def pause(self) -> None: if self.should_exit.wait(self.config.reload_delay): raise StopIteration() - def __iter__(self) -> Iterator[Optional[List[Path]]]: + def __iter__(self) -> Iterator[list[Path] | None]: return self - def __next__(self) -> Optional[List[Path]]: + def __next__(self) -> list[Path] | None: return self.should_restart() def startup(self) -> None: @@ -114,7 +116,7 @@ def shutdown(self) -> None: ) logger.info(message, extra={"color_message": color_message}) - def should_restart(self) -> Optional[List[Path]]: + def should_restart(self) -> list[Path] | None: raise NotImplementedError("Reload strategies should override should_restart()") diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 87ce91f15..153b3d658 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import os import signal @@ -5,7 +7,7 @@ from multiprocessing.context import SpawnProcess from socket import socket from types import FrameType -from typing import Callable, List, Optional +from typing import Callable import click @@ -24,17 +26,17 @@ class Multiprocess: def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: self.config = config self.target = target self.sockets = sockets - self.processes: List[SpawnProcess] = [] + self.processes: list[SpawnProcess] = [] self.should_exit = threading.Event() self.pid = os.getpid() - def signal_handler(self, sig: int, frame: Optional[FrameType]) -> None: + def signal_handler(self, sig: int, frame: FrameType | None) -> None: """ A signal handler that is registered with the parent process. """ diff --git a/uvicorn/supervisors/statreload.py b/uvicorn/supervisors/statreload.py index 7cbcce310..2e25dd4a9 100644 --- a/uvicorn/supervisors/statreload.py +++ b/uvicorn/supervisors/statreload.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import logging from pathlib import Path from socket import socket -from typing import Callable, Dict, Iterator, List, Optional +from typing import Callable, Iterator from uvicorn.config import Config from uvicorn.supervisors.basereload import BaseReload @@ -13,12 +15,12 @@ class StatReload(BaseReload): def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: super().__init__(config, target, sockets) self.reloader_name = "StatReload" - self.mtimes: Dict[Path, float] = {} + self.mtimes: dict[Path, float] = {} if config.reload_excludes or config.reload_includes: logger.warning( @@ -26,7 +28,7 @@ def __init__( "watchfiles is installed." ) - def should_restart(self) -> Optional[List[Path]]: + def should_restart(self) -> list[Path] | None: self.pause() for file in self.iter_py_files(): diff --git a/uvicorn/supervisors/watchfilesreload.py b/uvicorn/supervisors/watchfilesreload.py index 0d5f807ce..e1cb311f2 100644 --- a/uvicorn/supervisors/watchfilesreload.py +++ b/uvicorn/supervisors/watchfilesreload.py @@ -1,6 +1,8 @@ +from __future__ import annotations + from pathlib import Path from socket import socket -from typing import Callable, List, Optional +from typing import Callable from watchfiles import watch @@ -62,8 +64,8 @@ class WatchFilesReload(BaseReload): def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: super().__init__(config, target, sockets) self.reloader_name = "WatchFiles" @@ -84,7 +86,7 @@ def __init__( yield_on_timeout=True, ) - def should_restart(self) -> Optional[List[Path]]: + def should_restart(self) -> list[Path] | None: self.pause() changes = next(self.watcher) diff --git a/uvicorn/supervisors/watchgodreload.py b/uvicorn/supervisors/watchgodreload.py index 363d9c4db..987909fd6 100644 --- a/uvicorn/supervisors/watchgodreload.py +++ b/uvicorn/supervisors/watchgodreload.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import logging import warnings from pathlib import Path from socket import socket -from typing import TYPE_CHECKING, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Callable from watchgod import DefaultWatcher @@ -37,8 +39,8 @@ def __init__(self, root_path: Path, config: Config): self.excludes.extend(config.reload_excludes) self.excludes = list(set(self.excludes)) - self.watched_dirs: Dict[str, bool] = {} - self.watched_files: Dict[str, bool] = {} + self.watched_dirs: dict[str, bool] = {} + self.watched_files: dict[str, bool] = {} self.dirs_includes = set(config.reload_dirs) self.dirs_excludes = set(config.reload_dirs_excludes) self.resolved_root = root_path @@ -130,8 +132,8 @@ class WatchGodReload(BaseReload): def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: warnings.warn( '"watchgod" is deprecated, you should switch ' @@ -150,7 +152,7 @@ def __init__( for w in reload_dirs: self.watchers.append(CustomWatcher(w.resolve(), self.config)) - def should_restart(self) -> Optional[List[Path]]: + def should_restart(self) -> list[Path] | None: self.pause() for watcher in self.watchers: