Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for more flexible control of connection backlog and number of connections accepted "at once" #3062

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions tornado/netutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,23 @@
# For undiagnosed reasons, 'latin1' codec may also need to be preloaded.
u"foo".encode("latin1")

# Default backlog used when calling sock.listen()
_DEFAULT_BACKLOG = 128
# Default backlog used when calling sock.listen(); if `None`, no value is
# passed to `socket.listen` implying normally that Python will decide on
# the backlog size for the socket. For Python < 3.6, an error will be
# raised by `socket.listen` if no argument is provided.
DEFAULT_BACKLOG = None

# Number of connections to try and accept in one sweep of an I/O loop;
# if falsy (e.g. `None` or zero) the number will be decided on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments imply that monkey-patching these constants is intended to be a public interface for configuration. I don't think that's a good idea (or at least I'd want to see some more justification for why someone might want to change this so we could think about the most appropriate interface).

I'd rather just hard-code the number of iterations in accept_handler, and get rid of the tornado-side _DEFAULT_BACKLOG (so we call listen() with an argument if a backlog is given, and otherwise use no arguments so that the default is determined by lower levels).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently debugging a case with our usage of tornado where we do want to change this to a much smaller value.

What we have seen is that in multiprocessing pre-fork setups, where we have a large number of workers sharing 1 socket, we see very poor tail latencies under high load. We have narrowed down due to huge amounts of queuing within processing that accidentally accept far too high of a share of the incoming connections. IE while we might have on average 3 async requests in flight per worker, when we see the outliers with high tail latencies hitting timeouts and inspect their io_loops we'll see that 30 requests connections got all accepted at once on that worker, and that 30th request queue up at the end in the worker will have massively increased latency.

We are currently looking at ways to prevent this poor load balancing, and at least one element of it, might be to limit the number of connection accepted at a time, so that if a small queue under high load of pending connections has built up, it's not entirely taken by a single worker, and is better distributed in smaller chunks over multiple workers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. That does sound like a good reason to make the accept-calls-per-loop value configurable (or maybe just to hard-code it to a small value, but I suspect we'd have a hard time agreeing on what that value should be), so the accept-calls-per-loop variable is valuable even aside from its use here for the case when the accept backlog is unspecified/None.

I still don't like monkey-patching module-level constants as the configuration interface, but it seems like whatever interface we come up with should be used for both variables.

In the meantime, have you tried SO_REUSEPORT (and binding the socket after the fork)? My understanding is that the kernel treats this differently from the case of a socket opened before a fork, in a way that generally does a better job at balancing the load across the processes. I don't have any experience with it at scale, though.

# automatically.
ACCEPT_CALLS_PER_EVENT_LOOP = None


def bind_sockets(
port: int,
address: Optional[str] = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = _DEFAULT_BACKLOG,
backlog: Optional[int] = None,
flags: Optional[int] = None,
reuse_port: bool = False,
) -> List[socket.socket]:
Expand All @@ -74,7 +82,8 @@ def bind_sockets(
both will be used if available.

The ``backlog`` argument has the same meaning as for
`socket.listen() <socket.socket.listen>`.
`socket.listen() <socket.socket.listen>` if it carries an integer
value; if `None`, the backlog size is chosen automatically.

``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.
Expand Down Expand Up @@ -181,15 +190,15 @@ def bind_sockets(
else:
raise
bound_port = sock.getsockname()[1]
sock.listen(backlog)
listen(sock, backlog)
sockets.append(sock)
return sockets


if hasattr(socket, "AF_UNIX"):

def bind_unix_socket(
file: str, mode: int = 0o600, backlog: int = _DEFAULT_BACKLOG
file: str, mode: int = 0o600, backlog: Optional[int] = None
) -> socket.socket:
"""Creates a listening unix socket.

Expand Down Expand Up @@ -219,7 +228,7 @@ def bind_unix_socket(
raise ValueError("File %s exists and is not a socket", file)
sock.bind(file)
os.chmod(file, mode)
sock.listen(backlog)
listen(sock, backlog)
return sock


Expand Down Expand Up @@ -258,7 +267,7 @@ def accept_handler(fd: socket.socket, events: int) -> None:
# Instead, we use the (default) listen backlog as a rough
# heuristic for the number of connections we can reasonably
# accept at once.
for i in range(_DEFAULT_BACKLOG):
for i in range(ACCEPT_CALLS_PER_EVENT_LOOP or DEFAULT_BACKLOG or 128):
if removed[0]:
# The socket was probably closed
return
Expand Down Expand Up @@ -621,3 +630,12 @@ def ssl_wrap_socket(
return context.wrap_socket(socket, server_hostname=server_hostname, **kwargs)
else:
return context.wrap_socket(socket, **kwargs)


def listen(socket: socket.socket, backlog: int = None):
"""A helper procedure to better delegate `socket.listen` calls where
backlog may or may not be specified.
"""
if backlog is None:
backlog = DEFAULT_BACKLOG
return socket.listen(backlog) if backlog is not None else socket.listen()