Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BufferedProtocol support #2033

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import re

from asyncio import (
BufferedProtocol,
CancelledError,
Protocol,
ensure_future,
get_event_loop,
wait_for,
Expand Down Expand Up @@ -872,7 +872,7 @@ def run(
ssl: Union[dict, SSLContext, None] = None,
sock: Optional[socket] = None,
workers: int = 1,
protocol: Optional[Type[Protocol]] = None,
protocol: Optional[Type[BufferedProtocol]] = None,
backlog: int = 100,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
Expand Down Expand Up @@ -1001,7 +1001,7 @@ async def create_server(
debug: bool = False,
ssl: Union[dict, SSLContext, None] = None,
sock: Optional[socket] = None,
protocol: Type[Protocol] = None,
protocol: Optional[Type[BufferedProtocol]] = None,
backlog: int = 100,
access_log: Optional[bool] = None,
unix: Optional[str] = None,
Expand Down
2 changes: 1 addition & 1 deletion sanic/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ async def read(self) -> Optional[bytes]:
self.request_body = None
# Because we are leaving one CRLF in the buffer, we manually
# reset the buffer here
self.recv_buffer = bytearray()
self.recv_buffer = self.protocol.recv_buffer = bytearray()
Copy link
Member

@Tronic Tronic Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still incorrect, and in general you should not recreate the buffer. Use del self.recv_buffer[:2] or so to keep the existing buffer but to consume the data that you want removed (note: as discussed earlier, removing all or the first two bytes here might be is invalid anyway).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still am not sure I see how. If we do not, and even if we account for the http loop, you end up with \r\n as the first two bytes on the next request.


if size < 0:
self.keep_alive = False
Expand Down
48 changes: 28 additions & 20 deletions sanic/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import socket
import stat

from asyncio import CancelledError
from asyncio import BufferedProtocol, CancelledError
from asyncio.transports import Transport
from functools import partial
from inspect import isawaitable
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(self, transport: TransportProtocol, unix=None):
self.client_port = addr[1]


class HttpProtocol(asyncio.Protocol):
class HttpProtocol(BufferedProtocol):
"""
This class provides a basic HTTP implementation of the sanic framework.
"""
Expand Down Expand Up @@ -132,6 +132,7 @@ class HttpProtocol(asyncio.Protocol):
# connection management
"state",
"url",
"_buffer",
"_handler_task",
"_can_write",
"_data_received",
Expand All @@ -140,6 +141,7 @@ class HttpProtocol(asyncio.Protocol):
"_http",
"_exception",
"recv_buffer",
"_buffer",
"_unix",
)

Expand Down Expand Up @@ -301,6 +303,9 @@ def connection_made(self, transport):
self.transport = transport
self._task = self.loop.create_task(self.connection_task())
self.recv_buffer = bytearray()
self._buffer = memoryview(
bytearray(self.app.config.REQUEST_BUFFER_SIZE)
)
Comment on lines +306 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a waste of RAM. The buffer should be kept as small as possible, not always the maximum permitted size. This makes a difference when handling a lot of parallel requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure where this value is coming from, but it is the size limit that get_buffer seems to want 🤔

>> get_buffer sizehint=65536

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sizehint is hardcoded in libuv (and probably in a similar manner in plain asyncio). We can safely ignore the hint because libuv will then simply call recv_into with a smaller buffer. For the sake of simplicity, it might be best to keep our buffer hardcoded to 64 KiB as well, at least until it can be evaluated whether BufferedProtocol offers those performance benefits that we want.

self.conn_info = ConnInfo(self.transport, unix=self._unix)
except Exception:
error_logger.exception("protocol.connect_made")
Expand All @@ -320,23 +325,26 @@ def pause_writing(self):
def resume_writing(self):
self._can_write.set()

def data_received(self, data: bytes):
try:
self._time = current_time()
if not data:
return self.close()
self.recv_buffer += data

if (
len(self.recv_buffer) > self.app.config.REQUEST_BUFFER_SIZE
and self.transport
):
self.transport.pause_reading()
def get_buffer(self, sizehint=-1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is sizehint used for here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizehint is the recommended minimum size for the returned buffer. It is acceptable to return smaller or larger buffers than what sizehint suggests. When set to -1, the buffer size can be arbitrary. It is an error to return a buffer with a zero size.

Source

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are ignoring it here.

return self._buffer

if self._data_received:
self._data_received.set()
except Exception:
error_logger.exception("protocol.data_received")
def buffer_updated(self, nbytes: int) -> None:
data = self._buffer[:nbytes]

self._time = current_time()
self.recv_buffer += data
Comment on lines +331 to +335
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the get_buffer function be returning a memoryview to recv_buffer directly, rather than separate _buffer, to avoid this copying of data here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, since we are concurrently receiving more data (so cannot alter the buffer) and handling a request (which needs to consume bytes from buffer), this cannot easily work. Which begs the question: why use BufferedProtocol in the first place?

Copy link
Member Author

@ahopkins ahopkins Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was just my proof of concept to see if the BufferedProtocol would play more nicely with streaming requests. We probably need to consume the data somehow from the object returned by get_buffer.

I am thinking perhaps what we might want to do is somehow sync on the buffer_updated by grabbing the nbytes value, and then using that as recv_buffer instead of having that object. It is sort of duplicative at this point. This is a larger refactor though.


if (
len(self.recv_buffer) > self.app.config.REQUEST_BUFFER_SIZE
and self.transport
):
self.transport.pause_reading()

if self._data_received:
self._data_received.set()

def eof_received(self):
return self.close()


def trigger_events(events: Optional[Iterable[Callable[..., Any]]], loop):
Expand Down Expand Up @@ -466,7 +474,7 @@ def serve(
unix: Optional[str] = None,
reuse_port: bool = False,
loop=None,
protocol: Type[asyncio.Protocol] = HttpProtocol,
protocol: Type[HttpProtocol] = HttpProtocol,
backlog: int = 100,
register_sys_signals: bool = True,
run_multiple: bool = False,
Expand Down Expand Up @@ -618,7 +626,7 @@ def serve(


def _build_protocol_kwargs(
protocol: Type[asyncio.Protocol], config: Config
protocol: Type[HttpProtocol], config: Config
) -> Dict[str, Union[int, float]]:
if hasattr(protocol, "websocket_handshake"):
return {
Expand Down