Skip to content

Commit

Permalink
Fix spurious LocalProtocolError errors when processing pipelined requ…
Browse files Browse the repository at this point in the history
…ests (#2243)
  • Loading branch information
marcinsulikowski authored Feb 10, 2024
1 parent 4f74ed1 commit 2ff704b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
52 changes: 47 additions & 5 deletions tests/protocols/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ def set_protocol(self, protocol):
pass


class MockTimerHandle:
def __init__(self, loop_later_list, delay, callback, args):
self.loop_later_list = loop_later_list
self.delay = delay
self.callback = callback
self.args = args
self.cancelled = False

def cancel(self):
if not self.cancelled:
self.cancelled = True
self.loop_later_list.remove(self)


class MockLoop:
def __init__(self):
self._tasks = []
Expand All @@ -186,18 +200,20 @@ def create_task(self, coroutine):
return MockTask()

def call_later(self, delay, callback, *args):
self._later.insert(0, (delay, callback, args))
handle = MockTimerHandle(self._later, delay, callback, args)
self._later.insert(0, handle)
return handle

async def run_one(self):
return await self._tasks.pop()

def run_later(self, with_delay):
later = []
for delay, callback, args in self._later:
if with_delay >= delay:
callback(*args)
for timer_handle in self._later:
if with_delay >= timer_handle.delay:
timer_handle.callback(*timer_handle.args)
else:
later.append((delay, callback, args))
later.append(timer_handle)
self._later = later


Expand Down Expand Up @@ -315,6 +331,32 @@ async def test_keepalive_timeout(http_protocol_cls: HTTPProtocol):
assert protocol.transport.is_closing()


@pytest.mark.anyio
async def test_keepalive_timeout_with_pipelined_requests(
http_protocol_cls: HTTPProtocol,
):
app = Response("Hello, world", media_type="text/plain")

protocol = get_connected_protocol(app, http_protocol_cls)
protocol.data_received(SIMPLE_GET_REQUEST)
protocol.data_received(SIMPLE_GET_REQUEST)

# After processing the first request, the keep-alive task should be
# disabled because the second request is not responded yet.
await protocol.loop.run_one()
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"Hello, world" in protocol.transport.buffer
assert protocol.timeout_keep_alive_task is None

# Process the second request and ensure that the keep-alive task
# has been enabled again as the connection is now idle.
protocol.transport.clear_buffer()
await protocol.loop.run_one()
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"Hello, world" in protocol.transport.buffer
assert protocol.timeout_keep_alive_task is not None


@pytest.mark.anyio
async def test_close(http_protocol_cls: HTTPProtocol):
app = Response(b"", status_code=204, headers={"connection": "close"})
Expand Down
8 changes: 8 additions & 0 deletions uvicorn/protocols/http/h11_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ def handle_events(self) -> None:
else:
app = self.app

# When starting to process a request, disable the keep-alive
# timeout. Normally we disable this when receiving data from
# client and set back when finishing processing its request.
# However, for pipelined requests processing finishes after
# already receiving the next request and thus the timer may
# be set here, which we don't want.
self._unset_keepalive_if_required()

self.cycle = RequestResponseCycle(
scope=self.scope,
conn=self.conn,
Expand Down
12 changes: 6 additions & 6 deletions uvicorn/protocols/http/httptools_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,22 +326,22 @@ def on_response_complete(self) -> None:
if self.transport.is_closing():
return

# Set a short Keep-Alive timeout.
self._unset_keepalive_if_required()

self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)

# Unpause data reads if needed.
self.flow.resume_reading()

# Unblock any pipelined events.
# Unblock any pipelined events. If there are none, arm the
# Keep-Alive timeout instead.
if self.pipeline:
cycle, app = self.pipeline.pop()
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
else:
self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)

def shutdown(self) -> None:
"""
Expand Down

0 comments on commit 2ff704b

Please sign in to comment.