diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index a4175f799..9aad5387f 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -8,6 +8,11 @@ dev
-
+**API Changes (Backward Compatible)**
+
+- h2 events now have tighter type bounds, e.g. `stream_id` is guaranteed to not be `None` for most events now.
+ This simplifies downstream type checking.
+
**Bugfixes**
-
diff --git a/src/h2/connection.py b/src/h2/connection.py
index 28be9fca2..aa773e891 100644
--- a/src/h2/connection.py
+++ b/src/h2/connection.py
@@ -1806,9 +1806,7 @@ def _receive_window_update_frame(self, frame: WindowUpdateFrame) -> tuple[list[F
)
# FIXME: Should we split this into one event per active stream?
- window_updated_event = WindowUpdated()
- window_updated_event.stream_id = 0
- window_updated_event.delta = frame.window_increment
+ window_updated_event = WindowUpdated(stream_id=0, delta=frame.window_increment)
stream_events = [window_updated_event]
frames = []
@@ -1825,9 +1823,9 @@ def _receive_ping_frame(self, frame: PingFrame) -> tuple[list[Frame], list[Event
evt: PingReceived | PingAckReceived
if "ACK" in frame.flags:
- evt = PingAckReceived()
+ evt = PingAckReceived(ping_data=frame.opaque_data)
else:
- evt = PingReceived()
+ evt = PingReceived(ping_data=frame.opaque_data)
# automatically ACK the PING with the same 'opaque data'
f = PingFrame(0)
@@ -1835,7 +1833,6 @@ def _receive_ping_frame(self, frame: PingFrame) -> tuple[list[Frame], list[Event
f.opaque_data = frame.opaque_data
frames.append(f)
- evt.ping_data = frame.opaque_data
events.append(evt)
return frames, events
@@ -1974,8 +1971,7 @@ def _receive_unknown_frame(self, frame: ExtensionFrame) -> tuple[list[Frame], li
self.config.logger.debug(
"Received unknown extension frame (ID %d)", frame.stream_id,
)
- event = UnknownFrameReceived()
- event.frame = frame
+ event = UnknownFrameReceived(frame=frame)
return [], [event]
def _local_settings_acked(self) -> dict[SettingCodes | int, ChangedSetting]:
diff --git a/src/h2/events.py b/src/h2/events.py
index b81fd1a63..6aab0713d 100644
--- a/src/h2/events.py
+++ b/src/h2/events.py
@@ -11,24 +11,41 @@
from __future__ import annotations
import binascii
-from typing import TYPE_CHECKING
+import sys
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
from .settings import ChangedSetting, SettingCodes, Settings, _setting_code_from_int
if TYPE_CHECKING: # pragma: no cover
- from hpack import HeaderTuple
+ from hpack.struct import Header
from hyperframe.frame import Frame
from .errors import ErrorCodes
+if sys.version_info < (3, 10): # pragma: no cover
+ kw_only: dict[str, bool] = {}
+else: # pragma: no cover
+ kw_only = {"kw_only": True}
+
+
+_LAZY_INIT: Any = object()
+"""
+Some h2 events are instantiated by the state machine, but its attributes are
+subsequently populated by H2Stream. To make this work with strict type annotations
+on the events, they are temporarily set to this placeholder value.
+This value should never be exposed to users.
+"""
+
+
class Event:
"""
Base class for h2 events.
"""
-
+@dataclass(**kw_only)
class RequestReceived(Event):
"""
The RequestReceived event is fired whenever all of a request's headers
@@ -47,31 +64,35 @@ class RequestReceived(Event):
Added ``stream_ended`` and ``priority_updated`` properties.
"""
- def __init__(self) -> None:
- #: The Stream ID for the stream this request was made on.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID for the stream this request was made on."""
+
+ headers: list[Header] = _LAZY_INIT
+ """The request headers."""
+
+ stream_ended: StreamEnded | None = None
+ """
+ If this request also ended the stream, the associated
+ :class:`StreamEnded
` event will be available
+ here.
- #: The request headers.
- self.headers: list[HeaderTuple] | None = None
+ .. versionadded:: 2.4.0
+ """
- #: If this request also ended the stream, the associated
- #: :class:`StreamEnded ` event will be available
- #: here.
- #:
- #: .. versionadded:: 2.4.0
- self.stream_ended: StreamEnded | None = None
+ priority_updated: PriorityUpdated | None = None
+ """
+ If this request also had associated priority information, the
+ associated :class:`PriorityUpdated `
+ event will be available here.
- #: If this request also had associated priority information, the
- #: associated :class:`PriorityUpdated `
- #: event will be available here.
- #:
- #: .. versionadded:: 2.4.0
- self.priority_updated: PriorityUpdated | None = None
+ .. versionadded:: 2.4.0
+ """
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class ResponseReceived(Event):
"""
The ResponseReceived event is fired whenever response headers are received.
@@ -86,31 +107,35 @@ class ResponseReceived(Event):
Added ``stream_ended`` and ``priority_updated`` properties.
"""
- def __init__(self) -> None:
- #: The Stream ID for the stream this response was made on.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID for the stream this response was made on."""
+
+ headers: list[Header] = _LAZY_INIT
+ """The response headers."""
+
+ stream_ended: StreamEnded | None = None
+ """
+ If this response also ended the stream, the associated
+ :class:`StreamEnded ` event will be available
+ here.
- #: The response headers.
- self.headers: list[HeaderTuple] | None = None
+ .. versionadded:: 2.4.0
+ """
- #: If this response also ended the stream, the associated
- #: :class:`StreamEnded ` event will be available
- #: here.
- #:
- #: .. versionadded:: 2.4.0
- self.stream_ended: StreamEnded | None = None
+ priority_updated: PriorityUpdated | None = None
+ """
+ If this response also had associated priority information, the
+ associated :class:`PriorityUpdated `
+ event will be available here.
- #: If this response also had associated priority information, the
- #: associated :class:`PriorityUpdated `
- #: event will be available here.
- #:
- #: .. versionadded:: 2.4.0
- self.priority_updated: PriorityUpdated | None = None
+ .. versionadded:: 2.4.0
+ """
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class TrailersReceived(Event):
"""
The TrailersReceived event is fired whenever trailers are received on a
@@ -128,25 +153,28 @@ class TrailersReceived(Event):
Added ``stream_ended`` and ``priority_updated`` properties.
"""
- def __init__(self) -> None:
- #: The Stream ID for the stream on which these trailers were received.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID for the stream on which these trailers were received."""
+
+ headers: list[Header] = _LAZY_INIT
+ """The trailers themselves."""
+
+ stream_ended: StreamEnded | None = None
+ """
+ Trailers always end streams. This property has the associated
+ :class:`StreamEnded ` in it.
- #: The trailers themselves.
- self.headers: list[HeaderTuple] | None = None
+ .. versionadded:: 2.4.0
+ """
- #: Trailers always end streams. This property has the associated
- #: :class:`StreamEnded ` in it.
- #:
- #: .. versionadded:: 2.4.0
- self.stream_ended: StreamEnded | None = None
+ priority_updated: PriorityUpdated | None = None
+ """
+ If the trailers also set associated priority information, the
+ associated :class:`PriorityUpdated `
+ event will be available here.
- #: If the trailers also set associated priority information, the
- #: associated :class:`PriorityUpdated `
- #: event will be available here.
- #:
- #: .. versionadded:: 2.4.0
- self.priority_updated: PriorityUpdated | None = None
+ .. versionadded:: 2.4.0
+ """
def __repr__(self) -> str:
return f""
@@ -207,7 +235,7 @@ class _PushedRequestSent(_HeadersSent):
"""
-
+@dataclass(**kw_only)
class InformationalResponseReceived(Event):
"""
The InformationalResponseReceived event is fired when an informational
@@ -231,25 +259,26 @@ class InformationalResponseReceived(Event):
Added ``priority_updated`` property.
"""
- def __init__(self) -> None:
- #: The Stream ID for the stream this informational response was made
- #: on.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID for the stream this informational response was made on."""
- #: The headers for this informational response.
- self.headers: list[HeaderTuple] | None = None
+ headers: list[Header] = _LAZY_INIT
+ """The headers for this informational response."""
+
+ priority_updated: PriorityUpdated | None = None
+ """
+ If this response also had associated priority information, the
+ associated :class:`PriorityUpdated `
+ event will be available here.
- #: If this response also had associated priority information, the
- #: associated :class:`PriorityUpdated `
- #: event will be available here.
- #:
- #: .. versionadded:: 2.4.0
- self.priority_updated: PriorityUpdated | None = None
+ .. versionadded:: 2.4.0
+ """
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class DataReceived(Event):
"""
The DataReceived event is fired whenever data is received on a stream from
@@ -260,25 +289,28 @@ class DataReceived(Event):
Added ``stream_ended`` property.
"""
- def __init__(self) -> None:
- #: The Stream ID for the stream this data was received on.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID for the stream this data was received on."""
+
+ data: bytes = _LAZY_INIT
+ """The data itself."""
- #: The data itself.
- self.data: bytes | None = None
+ flow_controlled_length: int = _LAZY_INIT
+ """
+ The amount of data received that counts against the flow control
+ window. Note that padding counts against the flow control window, so
+ when adjusting flow control you should always use this field rather
+ than ``len(data)``.
+ """
- #: The amount of data received that counts against the flow control
- #: window. Note that padding counts against the flow control window, so
- #: when adjusting flow control you should always use this field rather
- #: than ``len(data)``.
- self.flow_controlled_length: int | None = None
+ stream_ended: StreamEnded | None = None
+ """
+ If this data chunk also completed the stream, the associated
+ :class:`StreamEnded ` event will be available
+ here.
- #: If this data chunk also completed the stream, the associated
- #: :class:`StreamEnded ` event will be available
- #: here.
- #:
- #: .. versionadded:: 2.4.0
- self.stream_ended: StreamEnded | None = None
+ .. versionadded:: 2.4.0
+ """
def __repr__(self) -> str:
return (
@@ -292,6 +324,7 @@ def __repr__(self) -> str:
)
+@dataclass(**kw_only)
class WindowUpdated(Event):
"""
The WindowUpdated event is fired whenever a flow control window changes
@@ -301,13 +334,16 @@ class WindowUpdated(Event):
the connection), and the delta in the window size.
"""
- def __init__(self) -> None:
- #: The Stream ID of the stream whose flow control window was changed.
- #: May be ``0`` if the connection window was changed.
- self.stream_id: int | None = None
+ stream_id: int
+ """
+ The Stream ID of the stream whose flow control window was changed.
+ May be ``0`` if the connection window was changed.
+ """
- #: The window delta.
- self.delta: int | None = None
+ delta: int = _LAZY_INIT
+ """
+ The window delta.
+ """
def __repr__(self) -> str:
return f""
@@ -367,6 +403,7 @@ def __repr__(self) -> str:
)
+@dataclass(**kw_only)
class PingReceived(Event):
"""
The PingReceived event is fired whenever a PING is received. It contains
@@ -376,14 +413,14 @@ class PingReceived(Event):
.. versionadded:: 3.1.0
"""
- def __init__(self) -> None:
- #: The data included on the ping.
- self.ping_data: bytes | None = None
+ ping_data: bytes
+ """The data included on the ping."""
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class PingAckReceived(Event):
"""
The PingAckReceived event is fired whenever a PING acknowledgment is
@@ -396,14 +433,14 @@ class PingAckReceived(Event):
Removed deprecated but equivalent ``PingAcknowledged``.
"""
- def __init__(self) -> None:
- #: The data included on the ping.
- self.ping_data: bytes | None = None
+ ping_data: bytes
+ """The data included on the ping."""
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class StreamEnded(Event):
"""
The StreamEnded event is fired whenever a stream is ended by a remote
@@ -411,14 +448,14 @@ class StreamEnded(Event):
locally, but no further data or headers should be expected on that stream.
"""
- def __init__(self) -> None:
- #: The Stream ID of the stream that was closed.
- self.stream_id: int | None = None
+ stream_id: int
+ """The Stream ID of the stream that was closed."""
def __repr__(self) -> str:
return f""
+@dataclass(**kw_only)
class StreamReset(Event):
"""
The StreamReset event is fired in two situations. The first is when the
@@ -430,16 +467,20 @@ class StreamReset(Event):
This event is now fired when h2 automatically resets a stream.
"""
- def __init__(self) -> None:
- #: The Stream ID of the stream that was reset.
- self.stream_id: int | None = None
+ stream_id: int
+ """
+ The Stream ID of the stream that was reset.
+ """
- #: The error code given. Either one of :class:`ErrorCodes
- #: ` or ``int``
- self.error_code: ErrorCodes | None = None
+ error_code: ErrorCodes | int = _LAZY_INIT
+ """
+ The error code given.
+ """
- #: Whether the remote peer sent a RST_STREAM or we did.
- self.remote_reset = True
+ remote_reset: bool = True
+ """
+ Whether the remote peer sent a RST_STREAM or we did.
+ """
def __repr__(self) -> str:
return f""
@@ -460,7 +501,7 @@ def __init__(self) -> None:
self.parent_stream_id: int | None = None
#: The request headers, sent by the remote party in the push.
- self.headers: list[HeaderTuple] | None = None
+ self.headers: list[Header] | None = None
def __repr__(self) -> str:
return (
@@ -601,6 +642,7 @@ def __repr__(self) -> str:
)
+@dataclass(**kw_only)
class UnknownFrameReceived(Event):
"""
The UnknownFrameReceived event is fired when the remote peer sends a frame
@@ -616,9 +658,7 @@ class UnknownFrameReceived(Event):
.. versionadded:: 2.7.0
"""
- def __init__(self) -> None:
- #: The hyperframe Frame object that encapsulates the received frame.
- self.frame: Frame | None = None
+ frame: Frame
def __repr__(self) -> str:
return ""
diff --git a/src/h2/stream.py b/src/h2/stream.py
index 7d4a12e35..d102b056c 100644
--- a/src/h2/stream.py
+++ b/src/h2/stream.py
@@ -7,7 +7,7 @@
from __future__ import annotations
from enum import Enum, IntEnum
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, Union, cast
from hpack import HeaderTuple
from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame
@@ -46,7 +46,7 @@
from .windows import WindowManager
if TYPE_CHECKING: # pragma: no cover
- from collections.abc import Generator, Iterable
+ from collections.abc import Callable, Generator, Iterable
from hpack.hpack import Encoder
from hpack.struct import Header, HeaderWeaklyTyped
@@ -131,7 +131,7 @@ def __init__(self, stream_id: int) -> None:
# How the stream was closed. One of StreamClosedBy.
self.stream_closed_by: StreamClosedBy | None = None
- def process_input(self, input_: StreamInputs) -> Any:
+ def process_input(self, input_: StreamInputs) -> list[Event]:
"""
Process a specific input in the state machine.
"""
@@ -195,8 +195,7 @@ def request_received(self, previous_state: StreamState) -> list[Event]:
self.client = False
self.headers_received = True
- event = RequestReceived()
- event.stream_id = self.stream_id
+ event = RequestReceived(stream_id=self.stream_id)
return [event]
def response_received(self, previous_state: StreamState) -> list[Event]:
@@ -208,11 +207,11 @@ def response_received(self, previous_state: StreamState) -> list[Event]:
if not self.headers_received:
assert self.client is True
self.headers_received = True
- event = ResponseReceived()
+ event = ResponseReceived(stream_id=self.stream_id)
else:
assert not self.trailers_received
self.trailers_received = True
- event = TrailersReceived()
+ event = TrailersReceived(stream_id=self.stream_id)
event.stream_id = self.stream_id
return [event]
@@ -224,25 +223,21 @@ def data_received(self, previous_state: StreamState) -> list[Event]:
if not self.headers_received:
msg = "cannot receive data before headers"
raise ProtocolError(msg)
- event = DataReceived()
- event.stream_id = self.stream_id
+ event = DataReceived(stream_id=self.stream_id)
return [event]
def window_updated(self, previous_state: StreamState) -> list[Event]:
"""
Fires when a window update frame is received.
"""
- event = WindowUpdated()
- event.stream_id = self.stream_id
- return [event]
+ return [WindowUpdated(stream_id=self.stream_id)]
def stream_half_closed(self, previous_state: StreamState) -> list[Event]:
"""
Fires when an END_STREAM flag is received in the OPEN state,
transitioning this stream to a HALF_CLOSED_REMOTE state.
"""
- event = StreamEnded()
- event.stream_id = self.stream_id
+ event = StreamEnded(stream_id=self.stream_id)
return [event]
def stream_ended(self, previous_state: StreamState) -> list[Event]:
@@ -250,8 +245,7 @@ def stream_ended(self, previous_state: StreamState) -> list[Event]:
Fires when a stream is cleanly ended.
"""
self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
- event = StreamEnded()
- event.stream_id = self.stream_id
+ event = StreamEnded(stream_id=self.stream_id)
return [event]
def stream_reset(self, previous_state: StreamState) -> list[Event]:
@@ -259,9 +253,7 @@ def stream_reset(self, previous_state: StreamState) -> list[Event]:
Fired when a stream is forcefully reset.
"""
self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
- event = StreamReset()
- event.stream_id = self.stream_id
- return [event]
+ return [StreamReset(stream_id=self.stream_id)]
def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
"""
@@ -315,21 +307,23 @@ def recv_push_promise(self, previous_state: StreamState) -> list[Event]:
event.parent_stream_id = self.stream_id
return [event]
- def send_end_stream(self, previous_state: StreamState) -> None:
+ def send_end_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when an attempt is made to send END_STREAM in the
HALF_CLOSED_REMOTE state.
"""
self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
+ return []
- def send_reset_stream(self, previous_state: StreamState) -> None:
+ def send_reset_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when an attempt is made to send RST_STREAM in a non-closed
stream state.
"""
self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
+ return []
- def reset_stream_on_error(self, previous_state: StreamState) -> None:
+ def reset_stream_on_error(self, previous_state: StreamState) -> list[Event]:
"""
Called when we need to forcefully emit another RST_STREAM frame on
behalf of the state machine.
@@ -342,15 +336,16 @@ def reset_stream_on_error(self, previous_state: StreamState) -> None:
self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
error = StreamClosedError(self.stream_id)
-
- event = StreamReset()
- event.stream_id = self.stream_id
- event.error_code = ErrorCodes.STREAM_CLOSED
- event.remote_reset = False
- error._events = [event]
+ error._events = [
+ StreamReset(
+ stream_id=self.stream_id,
+ error_code=ErrorCodes.STREAM_CLOSED,
+ remote_reset=False,
+ ),
+ ]
raise error
- def recv_on_closed_stream(self, previous_state: StreamState) -> None:
+ def recv_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when an unexpected frame is received on an already-closed
stream.
@@ -362,7 +357,7 @@ def recv_on_closed_stream(self, previous_state: StreamState) -> None:
"""
raise StreamClosedError(self.stream_id)
- def send_on_closed_stream(self, previous_state: StreamState) -> None:
+ def send_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when an attempt is made to send data on an already-closed
stream.
@@ -374,7 +369,7 @@ def send_on_closed_stream(self, previous_state: StreamState) -> None:
"""
raise StreamClosedError(self.stream_id)
- def recv_push_on_closed_stream(self, previous_state: StreamState) -> None:
+ def recv_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when a PUSH_PROMISE frame is received on a full stop
stream.
@@ -393,7 +388,7 @@ def recv_push_on_closed_stream(self, previous_state: StreamState) -> None:
msg = "Attempted to push on closed stream."
raise ProtocolError(msg)
- def send_push_on_closed_stream(self, previous_state: StreamState) -> None:
+ def send_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
"""
Called when an attempt is made to push on an already-closed stream.
@@ -429,9 +424,7 @@ def recv_informational_response(self, previous_state: StreamState) -> list[Event
msg = "Informational response after final response"
raise ProtocolError(msg)
- event = InformationalResponseReceived()
- event.stream_id = self.stream_id
- return [event]
+ return [InformationalResponseReceived(stream_id=self.stream_id)]
def recv_alt_svc(self, previous_state: StreamState) -> list[Event]:
"""
@@ -473,7 +466,7 @@ def recv_alt_svc(self, previous_state: StreamState) -> list[Event]:
# the event and let it get populated.
return [AlternativeServiceAvailable()]
- def send_alt_svc(self, previous_state: StreamState) -> None:
+ def send_alt_svc(self, previous_state: StreamState) -> list[Event]:
"""
Called when sending an ALTSVC frame on this stream.
@@ -489,6 +482,7 @@ def send_alt_svc(self, previous_state: StreamState) -> None:
if self.headers_sent:
msg = "Cannot send ALTSVC after sending response headers."
raise ProtocolError(msg)
+ return []
@@ -561,7 +555,10 @@ def send_alt_svc(self, previous_state: StreamState) -> None:
# (state, input) to tuples of (side_effect_function, end_state). This
# map contains all allowed transitions: anything not in this map is
# invalid and immediately causes a transition to ``closed``.
-_transitions = {
+_transitions: dict[
+ tuple[StreamState, StreamInputs],
+ tuple[Callable[[H2StreamStateMachine, StreamState], list[Event]] | None, StreamState],
+] = {
# State: idle
(StreamState.IDLE, StreamInputs.SEND_HEADERS):
(H2StreamStateMachine.request_sent, StreamState.OPEN),
@@ -1040,10 +1037,11 @@ def receive_push_promise_in_band(self,
events = self.state_machine.process_input(
StreamInputs.RECV_PUSH_PROMISE,
)
- events[0].pushed_stream_id = promised_stream_id
+ push_event = cast(PushedStreamReceived, events[0])
+ push_event.pushed_stream_id = promised_stream_id
hdr_validation_flags = self._build_hdr_validation_flags(events)
- events[0].headers = self._process_received_headers(
+ push_event.headers = self._process_received_headers(
headers, hdr_validation_flags, header_encoding,
)
return [], events
@@ -1077,22 +1075,30 @@ def receive_headers(self,
input_ = StreamInputs.RECV_HEADERS
events = self.state_machine.process_input(input_)
+ headers_event = cast(
+ Union[RequestReceived, ResponseReceived, TrailersReceived, InformationalResponseReceived],
+ events[0],
+ )
if end_stream:
es_events = self.state_machine.process_input(
StreamInputs.RECV_END_STREAM,
)
- events[0].stream_ended = es_events[0]
+ # We ensured it's not an information response at the beginning of the method.
+ cast(
+ Union[RequestReceived, ResponseReceived, TrailersReceived],
+ headers_event,
+ ).stream_ended = cast(StreamEnded, es_events[0])
events += es_events
self._initialize_content_length(headers)
- if isinstance(events[0], TrailersReceived) and not end_stream:
+ if isinstance(headers_event, TrailersReceived) and not end_stream:
msg = "Trailers must have END_STREAM set"
raise ProtocolError(msg)
hdr_validation_flags = self._build_hdr_validation_flags(events)
- events[0].headers = self._process_received_headers(
+ headers_event.headers = self._process_received_headers(
headers, hdr_validation_flags, header_encoding,
)
return [], events
@@ -1106,6 +1112,7 @@ def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) ->
"set to %d", self, end_stream, flow_control_len,
)
events = self.state_machine.process_input(StreamInputs.RECV_DATA)
+ data_event = cast(DataReceived, events[0])
self._inbound_window_manager.window_consumed(flow_control_len)
self._track_content_length(len(data), end_stream)
@@ -1113,11 +1120,11 @@ def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) ->
es_events = self.state_machine.process_input(
StreamInputs.RECV_END_STREAM,
)
- events[0].stream_ended = es_events[0]
+ data_event.stream_ended = cast(StreamEnded, es_events[0])
events.extend(es_events)
- events[0].data = data
- events[0].flow_controlled_length = flow_control_len
+ data_event.data = data
+ data_event.flow_controlled_length = flow_control_len
return [], events
def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]:
@@ -1137,7 +1144,7 @@ def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event
# this should be treated as a *stream* error, not a *connection* error.
# That means we need to catch the error and forcibly close the stream.
if events:
- events[0].delta = increment
+ cast(WindowUpdated, events[0]).delta = increment
try:
self.outbound_flow_control_window = guard_increment_window(
self.outbound_flow_control_window,
@@ -1146,13 +1153,14 @@ def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event
except FlowControlError:
# Ok, this is bad. We're going to need to perform a local
# reset.
- event = StreamReset()
- event.stream_id = self.stream_id
- event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
- event.remote_reset = False
-
- events = [event]
- frames = self.reset_stream(event.error_code)
+ events = [
+ StreamReset(
+ stream_id=self.stream_id,
+ error_code=ErrorCodes.FLOW_CONTROL_ERROR,
+ remote_reset=False,
+ ),
+ ]
+ frames = self.reset_stream(ErrorCodes.FLOW_CONTROL_ERROR)
return frames, events
@@ -1220,7 +1228,7 @@ def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]
if events:
# We don't fire an event if this stream is already closed.
- events[0].error_code = _error_code_from_int(frame.error_code)
+ cast(StreamReset, events[0]).error_code = _error_code_from_int(frame.error_code)
return [], events
@@ -1322,7 +1330,7 @@ def _build_headers_frames(self,
def _process_received_headers(self,
headers: Iterable[Header],
header_validation_flags: HeaderValidationFlags,
- header_encoding: bool | str | None) -> Iterable[Header]:
+ header_encoding: bool | str | None) -> list[Header]:
"""
When headers have been received from the remote peer, run a processing
pipeline on them to transform them into the appropriate form for
diff --git a/tests/test_events.py b/tests/test_events.py
index aac913586..a43543c86 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -7,6 +7,7 @@
import inspect
import sys
+import hyperframe.frame
import pytest
from hypothesis import given
from hypothesis.strategies import integers, lists, tuples
@@ -114,9 +115,10 @@ def test_requestreceived_repr(self) -> None:
"""
RequestReceived has a useful debug representation.
"""
- e = h2.events.RequestReceived()
- e.stream_id = 5
- e.headers = self.example_request_headers
+ e = h2.events.RequestReceived(
+ stream_id=5,
+ headers=self.example_request_headers
+ )
assert repr(e) == (
" None:
"""
ResponseReceived has a useful debug representation.
"""
- e = h2.events.ResponseReceived()
- e.stream_id = 500
- e.headers = self.example_response_headers
+ e = h2.events.ResponseReceived(
+ stream_id=500,
+ headers=self.example_response_headers,
+ )
assert repr(e) == (
" None:
"""
TrailersReceived has a useful debug representation.
"""
- e = h2.events.TrailersReceived()
- e.stream_id = 62
- e.headers = self.example_response_headers
+ e = h2.events.TrailersReceived(stream_id=62, headers=self.example_response_headers)
assert repr(e) == (
" None:
"""
InformationalResponseReceived has a useful debug representation.
"""
- e = h2.events.InformationalResponseReceived()
- e.stream_id = 62
- e.headers = self.example_informational_headers
+ e = h2.events.InformationalResponseReceived(
+ stream_id=62,
+ headers=self.example_informational_headers,
+ )
assert repr(e) == (
" None:
"""
DataReceived has a useful debug representation.
"""
- e = h2.events.DataReceived()
- e.stream_id = 888
- e.data = b"abcdefghijklmnopqrstuvwxyz"
- e.flow_controlled_length = 88
+ e = h2.events.DataReceived(
+ stream_id=888,
+ data=b"abcdefghijklmnopqrstuvwxyz",
+ flow_controlled_length=88,
+ )
assert repr(e) == (
" None:
"""
WindowUpdated has a useful debug representation.
"""
- e = h2.events.WindowUpdated()
- e.stream_id = 0
- e.delta = 2**16
+ e = h2.events.WindowUpdated(stream_id=0, delta=2**16)
assert repr(e) == ""
@@ -221,8 +222,7 @@ def test_pingreceived_repr(self) -> None:
"""
PingReceived has a useful debug representation.
"""
- e = h2.events.PingReceived()
- e.ping_data = b"abcdefgh"
+ e = h2.events.PingReceived(ping_data=b"abcdefgh")
assert repr(e) == ""
@@ -230,8 +230,7 @@ def test_pingackreceived_repr(self) -> None:
"""
PingAckReceived has a useful debug representation.
"""
- e = h2.events.PingAckReceived()
- e.ping_data = b"abcdefgh"
+ e = h2.events.PingAckReceived(ping_data=b"abcdefgh")
assert repr(e) == ""
@@ -239,8 +238,7 @@ def test_streamended_repr(self) -> None:
"""
StreamEnded has a useful debug representation.
"""
- e = h2.events.StreamEnded()
- e.stream_id = 99
+ e = h2.events.StreamEnded(stream_id=99)
assert repr(e) == ""
@@ -248,10 +246,11 @@ def test_streamreset_repr(self) -> None:
"""
StreamEnded has a useful debug representation.
"""
- e = h2.events.StreamReset()
- e.stream_id = 919
- e.error_code = h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
- e.remote_reset = False
+ e = h2.events.StreamReset(
+ stream_id=919,
+ error_code=h2.errors.ErrorCodes.ENHANCE_YOUR_CALM,
+ remote_reset=False,
+ )
if sys.version_info >= (3, 11):
assert repr(e) == (
@@ -363,7 +362,7 @@ def test_unknownframereceived_repr(self) -> None:
"""
UnknownFrameReceived has a useful debug representation.
"""
- e = h2.events.UnknownFrameReceived()
+ e = h2.events.UnknownFrameReceived(frame=hyperframe.frame.Frame(1))
assert repr(e) == ""