Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WebSocket reader flow control calculations #9685

Merged
merged 41 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a8fa035
cleanups
bdraco Nov 5, 2024
539ff38
Update aiohttp/_websocket/reader_py.py
bdraco Nov 5, 2024
96d4113
Update aiohttp/client.py
bdraco Nov 5, 2024
ba192fd
Update aiohttp/client.py
bdraco Nov 5, 2024
891036c
Update aiohttp/client_ws.py
bdraco Nov 5, 2024
7cbe329
Update aiohttp/web_ws.py
bdraco Nov 5, 2024
f6b8faa
payload may be None
bdraco Nov 5, 2024
704010b
payload may not be len able
bdraco Nov 5, 2024
a1b54ed
cleanup
bdraco Nov 5, 2024
9555016
split it
bdraco Nov 6, 2024
666809f
split it
bdraco Nov 6, 2024
2e5dc54
preen
bdraco Nov 6, 2024
abbd219
fix
bdraco Nov 6, 2024
8cd2001
fixes
bdraco Nov 6, 2024
bec470b
more fixes
bdraco Nov 6, 2024
faad625
fix more tests
bdraco Nov 6, 2024
2b661c2
fix benchmark
bdraco Nov 6, 2024
25fd3fd
move size into message
bdraco Nov 6, 2024
a463ee1
move size into message
bdraco Nov 6, 2024
151bb30
move size into message
bdraco Nov 6, 2024
9726051
more fixes
bdraco Nov 6, 2024
2bac4c8
Apply suggestions from code review
bdraco Nov 6, 2024
7d56e4e
Apply suggestions from code review
bdraco Nov 6, 2024
1b59b7d
fix typo
bdraco Nov 6, 2024
763ec7e
fixes
bdraco Nov 6, 2024
328738d
preen
bdraco Nov 6, 2024
effafee
lint
bdraco Nov 6, 2024
b271554
cache prop
bdraco Nov 6, 2024
15ef47d
try to avoid all the py math
bdraco Nov 6, 2024
0cd95b0
no return
bdraco Nov 6, 2024
4cf1b69
preen
bdraco Nov 6, 2024
4773d89
dry
bdraco Nov 6, 2024
080d39a
compare data.size to tuple access
bdraco Nov 6, 2024
8cc4b8e
Apply suggestions from code review
bdraco Nov 6, 2024
b4f60eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 6, 2024
35a341c
Revert "[pre-commit.ci] auto fixes from pre-commit.com hooks"
bdraco Nov 6, 2024
af16201
Revert "Apply suggestions from code review"
bdraco Nov 6, 2024
7fb47a2
changelog
bdraco Nov 7, 2024
29dd891
Merge branch 'master' into websocket_flow_control
bdraco Nov 7, 2024
9e839d8
Update CHANGES/9685.breaking.rst
bdraco Nov 7, 2024
6c174e2
Merge branch 'master' into websocket_flow_control
bdraco Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/9685.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The ``FlowControlDataQueue`` has been replaced with the ``WebSocketDataQueue`` -- by :user:`bdraco`.
bdraco marked this conversation as resolved.
Show resolved Hide resolved
bdraco marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 1 addition & 8 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@
payload_type,
)
from .resolver import AsyncResolver, DefaultResolver, ThreadedResolver
from .streams import (
EMPTY_PAYLOAD,
DataQueue,
EofStream,
FlowControlDataQueue,
StreamReader,
)
from .streams import EMPTY_PAYLOAD, DataQueue, EofStream, StreamReader
from .tracing import (
TraceConfig,
TraceConnectionCreateEndParams,
Expand Down Expand Up @@ -206,7 +200,6 @@
"DataQueue",
"EMPTY_PAYLOAD",
"EofStream",
"FlowControlDataQueue",
"StreamReader",
# tracing
"TraceConfig",
Expand Down
9 changes: 9 additions & 0 deletions aiohttp/_websocket/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ class WSMsgType(IntEnum):

class WSMessageContinuation(NamedTuple):
data: bytes
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.CONTINUATION] = WSMsgType.CONTINUATION


class WSMessageText(NamedTuple):
data: str
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.TEXT] = WSMsgType.TEXT

Expand All @@ -58,6 +60,7 @@ def json(

class WSMessageBinary(NamedTuple):
data: bytes
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.BINARY] = WSMsgType.BINARY

Expand All @@ -70,36 +73,42 @@ def json(

class WSMessagePing(NamedTuple):
data: bytes
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.PING] = WSMsgType.PING


class WSMessagePong(NamedTuple):
data: bytes
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.PONG] = WSMsgType.PONG


class WSMessageClose(NamedTuple):
data: int
size: int
extra: Optional[str] = None
type: Literal[WSMsgType.CLOSE] = WSMsgType.CLOSE


class WSMessageClosing(NamedTuple):
data: None = None
size: int = 0
bdraco marked this conversation as resolved.
Show resolved Hide resolved
extra: Optional[str] = None
type: Literal[WSMsgType.CLOSING] = WSMsgType.CLOSING


class WSMessageClosed(NamedTuple):
data: None = None
size: int = 0
extra: Optional[str] = None
type: Literal[WSMsgType.CLOSED] = WSMsgType.CLOSED


class WSMessageError(NamedTuple):
data: BaseException
size: int = 0
extra: Optional[str] = None
type: Literal[WSMsgType.ERROR] = WSMsgType.ERROR

Expand Down
14 changes: 12 additions & 2 deletions aiohttp/_websocket/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@
from ..helpers import NO_EXTENSIONS

if TYPE_CHECKING or NO_EXTENSIONS: # pragma: no cover
from .reader_py import WebSocketReader as WebSocketReaderPython
from .reader_py import (
WebSocketDataQueue as WebSocketDataQueuePython,
WebSocketReader as WebSocketReaderPython,
)

WebSocketReader = WebSocketReaderPython
WebSocketDataQueue = WebSocketDataQueuePython
else:
try:
from .reader_c import ( # type: ignore[import-not-found]
WebSocketDataQueue as WebSocketDataQueueCython,
WebSocketReader as WebSocketReaderCython,
)

WebSocketReader = WebSocketReaderCython
WebSocketDataQueue = WebSocketDataQueueCython
except ImportError: # pragma: no cover
from .reader_py import WebSocketReader as WebSocketReaderPython
from .reader_py import (
WebSocketDataQueue as WebSocketDataQueuePython,
WebSocketReader as WebSocketReaderPython,
)

WebSocketReader = WebSocketReaderPython
WebSocketDataQueue = WebSocketDataQueuePython
23 changes: 21 additions & 2 deletions aiohttp/_websocket/reader_c.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,30 @@ cdef set MESSAGE_TYPES_WITH_CONTENT
cdef tuple EMPTY_FRAME
cdef tuple EMPTY_FRAME_ERROR

cdef class WebSocketDataQueue:

cdef unsigned int _size
cdef public object _protocol
cdef unsigned int _limit
cdef object _loop
cdef bint _eof
cdef object _waiter
cdef object _exception
cdef public object _buffer
cdef object _get_buffer
cdef object _put_buffer

cdef void _release_waiter(self)

@cython.locals(size="unsigned int")
cpdef void feed_data(self, object data)

@cython.locals(size="unsigned int")
cdef _read_from_buffer(self)

cdef class WebSocketReader:

cdef object queue
cdef object _queue_feed_data
cdef WebSocketDataQueue queue
cdef unsigned int _max_msg_size

cdef Exception _exc
Expand Down
121 changes: 102 additions & 19 deletions aiohttp/_websocket/reader_py.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Reader for WebSocket protocol versions 13 and 8."""

from typing import Final, List, Optional, Set, Tuple, Union
import asyncio
import builtins
from collections import deque
from typing import Deque, Final, List, Optional, Set, Tuple, Type, Union

from ..base_protocol import BaseProtocol
from ..compression_utils import ZLibDecompressor
from ..helpers import set_exception
from ..streams import DataQueue
from ..helpers import _EXC_SENTINEL, set_exception
from ..streams import EofStream
from .helpers import UNPACK_CLOSE_CODE, UNPACK_LEN3, websocket_mask
from .models import (
WS_DEFLATE_TRAILING,
Expand Down Expand Up @@ -45,12 +49,88 @@
TUPLE_NEW = tuple.__new__


class WebSocketDataQueue:
"""WebSocketDataQueue resumes and pauses an underlying stream.

It is a destination for WebSocket data.
"""

def __init__(
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
) -> None:
self._size = 0
self._protocol = protocol
self._limit = limit * 2
self._loop = loop
self._eof = False
self._waiter: Optional[asyncio.Future[None]] = None
self._exception: Union[Type[BaseException], BaseException, None] = None
self._buffer: Deque[WSMessage] = deque()
self._get_buffer = self._buffer.popleft
self._put_buffer = self._buffer.append

def exception(self) -> Optional[Union[Type[BaseException], BaseException]]:
return self._exception

def set_exception(
self,
exc: Union[Type[BaseException], BaseException],
exc_cause: builtins.BaseException = _EXC_SENTINEL,
) -> None:
self._eof = True
self._exception = exc
if (waiter := self._waiter) is not None:
self._waiter = None
set_exception(waiter, exc, exc_cause)

def _release_waiter(self) -> None:
if (waiter := self._waiter) is None:
return
self._waiter = None
if not waiter.done():
waiter.set_result(None)

def feed_eof(self) -> None:
self._eof = True
self._release_waiter()

def feed_data(self, data: "WSMessage") -> None:
size = data.size
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._size += size
self._put_buffer(data)
self._release_waiter()
if self._size > self._limit and not self._protocol._reading_paused:
self._protocol.pause_reading()

async def read(self) -> WSMessage:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise
return self._read_from_buffer()

def _read_from_buffer(self) -> WSMessage:
if self._buffer:
data = self._get_buffer()
size = data.size
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._size -= size
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
if self._exception is not None:
raise self._exception
Fixed Show fixed Hide fixed
raise EofStream


class WebSocketReader:
def __init__(
self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True
self, queue: WebSocketDataQueue, max_msg_size: int, compress: bool = True
) -> None:
self.queue = queue
self._queue_feed_data = queue.feed_data
self._max_msg_size = max_msg_size

self._exc: Optional[Exception] = None
Expand Down Expand Up @@ -177,6 +257,7 @@ def _feed_data(self, data: bytes) -> None:
else:
payload_merged = bytes(assembled_payload)

size = len(payload_merged)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
if opcode == OP_CODE_TEXT:
try:
text = payload_merged.decode("utf-8")
Expand All @@ -189,15 +270,16 @@ def _feed_data(self, data: bytes) -> None:
# bottleneck, so we use tuple.__new__ to improve performance.
# This is not type safe, but many tests should fail in
# test_client_ws_functional.py if this is wrong.
msg = TUPLE_NEW(WSMessageText, (text, "", WS_MSG_TYPE_TEXT))
msg = TUPLE_NEW(WSMessageText, (text, size, "", WS_MSG_TYPE_TEXT))
bdraco marked this conversation as resolved.
Show resolved Hide resolved
else:
msg = TUPLE_NEW(
WSMessageBinary, (payload_merged, "", WS_MSG_TYPE_BINARY)
WSMessageBinary, (payload_merged, size, "", WS_MSG_TYPE_BINARY)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
)

self._queue_feed_data(msg)
self.queue.feed_data(msg)
elif opcode == OP_CODE_CLOSE:
if len(payload) >= 2:
payload_len = len(payload)
if payload_len >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
raise WebSocketError(
Expand All @@ -210,25 +292,26 @@ def _feed_data(self, data: bytes) -> None:
raise WebSocketError(
WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
) from exc
msg = WSMessageClose(data=close_code, extra=close_message)
msg = WSMessageClose(
data=close_code, size=payload_len, extra=close_message
)
elif payload:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
f"Invalid close frame: {fin} {opcode} {payload!r}",
)
else:
msg = WSMessageClose(data=0, extra="")

self._queue_feed_data(msg)
msg = WSMessageClose(data=0, size=payload_len, extra="")

self.queue.feed_data(msg)
elif opcode == OP_CODE_PING:
msg = WSMessagePing(data=payload, extra="")
self._queue_feed_data(msg)

self.queue.feed_data(
WSMessagePing(data=payload, size=len(payload), extra="")
)
elif opcode == OP_CODE_PONG:
msg = WSMessagePong(data=payload, extra="")
self._queue_feed_data(msg)

self.queue.feed_data(
WSMessagePong(data=payload, size=len(payload), extra="")
)
else:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"
Expand Down
8 changes: 3 additions & 5 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from yarl import URL

from . import hdrs, http, payload
from ._websocket.reader import WebSocketDataQueue
from .abc import AbstractCookieJar
from .client_exceptions import (
ClientConnectionError,
Expand Down Expand Up @@ -101,8 +102,7 @@
strip_auth_from_url,
)
from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
from .streams import FlowControlDataQueue
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .tracing import Trace, TraceConfig
from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, Query, StrOrURL

Expand Down Expand Up @@ -1035,9 +1035,7 @@ async def _ws_connect(

transport = conn.transport
assert transport is not None
reader: FlowControlDataQueue[WSMessage] = FlowControlDataQueue(
conn_proto, 2**16, loop=self._loop
)
reader = WebSocketDataQueue(conn_proto, 2**16, loop=self._loop)
conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
writer = WebSocketWriter(
conn_proto,
Expand Down
2 changes: 1 addition & 1 deletion aiohttp/client_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def set_exception(
def set_parser(self, parser: Any, payload: Any) -> None:
# TODO: actual types are:
# parser: WebSocketReader
# payload: FlowControlDataQueue
# payload: WebSocketDataQueue
# but they are not generi enough
# Need an ABC for both types
self._payload = payload
Expand Down
5 changes: 3 additions & 2 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from types import TracebackType
from typing import Any, Final, Optional, Type

from ._websocket.reader import WebSocketDataQueue
from .client_exceptions import ClientError, ServerTimeoutError, WSMessageTypeError
from .client_reqrep import ClientResponse
from .helpers import calculate_timeout_when, set_result
Expand All @@ -18,7 +19,7 @@
WSMsgType,
)
from .http_websocket import _INTERNAL_RECEIVE_TYPES, WebSocketWriter, WSMessageError
from .streams import EofStream, FlowControlDataQueue
from .streams import EofStream
from .typedefs import (
DEFAULT_JSON_DECODER,
DEFAULT_JSON_ENCODER,
Expand Down Expand Up @@ -46,7 +47,7 @@ class ClientWSTimeout:
class ClientWebSocketResponse:
def __init__(
self,
reader: "FlowControlDataQueue[WSMessage]",
reader: WebSocketDataQueue,
writer: WebSocketWriter,
protocol: Optional[str],
response: ClientResponse,
Expand Down
Loading
Loading