Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move DNS lookups into separate thread pool #11177

Merged
merged 7 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11177.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance regression when doing large number of hostname lookups at once. Introduced in v1.44.0.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 12 additions & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from twisted.internet import defer, error, reactor
from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.threadpool import ThreadPool

import synapse
from synapse.api.constants import MAX_PDU_SIZE
Expand All @@ -48,6 +49,7 @@
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.gai_resolver import GAIResolver
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string

Expand Down Expand Up @@ -338,9 +340,10 @@ async def start(hs: "HomeServer"):
Args:
hs: homeserver instance
"""
reactor = hs.get_reactor()

# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
reactor = hs.get_reactor()

@wrap_as_background_process("sighup")
def handle_sighup(*args, **kwargs):
Expand Down Expand Up @@ -371,6 +374,14 @@ def run_sighup(*args, **kwargs):
# Start the tracer
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa

# We want to use a separate thread pool for the resolver so that large
# numbers of DNS requests don't starve out other users of the threadpool.
resolver_threadpool = ThreadPool(name="gai_resolver")
resolver_threadpool.start()
reactor.installNameResolver(
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels a bit late? should we do it sooner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, you mean in case something decides to get the resolver and hold onto it during setup? I can move it to before we instansiate the homeserver class? (In practice I think its fine here, empirically at least all federation traffic went via the new resolver when I tried it on jki.re, but happy to move it to somewhere that makes a bit more sense)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think its weird that we start doing traffic with one resolver, then switch it out. Just move it to the start of the method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see! I thought you meant move it out of that function entirely. I've moved it to the top.


# Instantiate the modules so they can register their web resources to the module API
# before we start the listeners.
module_api = hs.get_module_api()
Expand Down
136 changes: 136 additions & 0 deletions synapse/util/gai_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# This is a direct lift from
# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/_resolver.py.
squahtx marked this conversation as resolved.
Show resolved Hide resolved
# We copy it here as we need to instantiate `GAIResolver` manually, but it is a
# private class.


from socket import (
AF_INET,
AF_INET6,
AF_UNSPEC,
SOCK_DGRAM,
SOCK_STREAM,
gaierror,
getaddrinfo,
)

from zope.interface import implementer

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.interfaces import IHostnameResolver, IHostResolution
from twisted.internet.threads import deferToThreadPool


@implementer(IHostResolution)
class HostResolution:
"""
The in-progress resolution of a given hostname.
"""

def __init__(self, name):
"""
Create a L{HostResolution} with the given name.
"""
self.name = name

def cancel(self):
# IHostResolution.cancel
raise NotImplementedError()


_any = frozenset([IPv4Address, IPv6Address])

_typesToAF = {
frozenset([IPv4Address]): AF_INET,
frozenset([IPv6Address]): AF_INET6,
_any: AF_UNSPEC,
}

_afToType = {
AF_INET: IPv4Address,
AF_INET6: IPv6Address,
}

_transportToSocket = {
"TCP": SOCK_STREAM,
"UDP": SOCK_DGRAM,
}

_socktypeToType = {
SOCK_STREAM: "TCP",
SOCK_DGRAM: "UDP",
}


@implementer(IHostnameResolver)
class GAIResolver:
"""
L{IHostnameResolver} implementation that resolves hostnames by calling
L{getaddrinfo} in a thread.
"""

def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo):
"""
Create a L{GAIResolver}.
@param reactor: the reactor to schedule result-delivery on
@type reactor: L{IReactorThreads}
@param getThreadPool: a function to retrieve the thread pool to use for
scheduling name resolutions. If not supplied, the use the given
C{reactor}'s thread pool.
@type getThreadPool: 0-argument callable returning a
L{twisted.python.threadpool.ThreadPool}
@param getaddrinfo: a reference to the L{getaddrinfo} to use - mainly
parameterized for testing.
@type getaddrinfo: callable with the same signature as L{getaddrinfo}
"""
self._reactor = reactor
self._getThreadPool = (
reactor.getThreadPool if getThreadPool is None else getThreadPool
)
self._getaddrinfo = getaddrinfo

def resolveHostName(
self,
resolutionReceiver,
hostName,
portNumber=0,
addressTypes=None,
transportSemantics="TCP",
):
"""
See L{IHostnameResolver.resolveHostName}
@param resolutionReceiver: see interface
@param hostName: see interface
@param portNumber: see interface
@param addressTypes: see interface
@param transportSemantics: see interface
@return: see interface
"""
pool = self._getThreadPool()
addressFamily = _typesToAF[
_any if addressTypes is None else frozenset(addressTypes)
]
socketType = _transportToSocket[transportSemantics]

def get():
try:
return self._getaddrinfo(
hostName, portNumber, addressFamily, socketType
)
except gaierror:
return []

d = deferToThreadPool(self._reactor, pool, get)
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)

@d.addCallback
def deliverResults(result):
for family, socktype, _proto, _cannoname, sockaddr in result:
addrType = _afToType[family]
resolutionReceiver.addressResolved(
addrType(_socktypeToType.get(socktype, "TCP"), *sockaddr)
)
resolutionReceiver.resolutionComplete()

return resolution