Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track number of hosts affected by the rate limiter
Browse files Browse the repository at this point in the history
Follow-up to #13534

Part of #13356
  • Loading branch information
MadLittleMods committed Aug 16, 2022
1 parent 149ac1d commit 8be321f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
12 changes: 12 additions & 0 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ def collect() -> Iterable[Metric]:
# TODO Do something nicer about this.
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)

T = TypeVar("T")


def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
"""Return the number of items in it for which func returns true."""
n = 0
for x in it:
if func(x):
n += 1
return n


@attr.s(slots=True, hash=True, auto_attribs=True)
class LaterGauge(Collector):
Expand Down Expand Up @@ -475,6 +486,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:
"MetricsResource",
"generate_latest",
"start_http_server",
count,
"LaterGauge",
"InFlightGauge",
"GaugeBucketCollector",
Expand Down
12 changes: 1 addition & 11 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.metrics import count, LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -68,16 +68,6 @@
T = TypeVar("T")


# TODO(paul): Should be shared somewhere
def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
"""Return the number of items in it for which func returns true."""
n = 0
for x in it:
if func(x):
n += 1
return n


class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
Expand Down
47 changes: 43 additions & 4 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import typing
from typing import Any, DefaultDict, Iterator, List, Set

from prometheus_client.core import Counter
from prometheus_client.core import Counter, Gauge

from twisted.internet import defer

Expand All @@ -29,6 +29,7 @@
make_deferred_yieldable,
run_in_background,
)
from synapse.metrics import count, LaterGauge
from synapse.util import Clock

if typing.TYPE_CHECKING:
Expand All @@ -51,6 +52,34 @@ def new_limiter() -> "_PerHostRatelimiter":
str, "_PerHostRatelimiter"
] = collections.defaultdict(new_limiter)

# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
[],
lambda: count(
bool,
[
ratelimiter.should_sleep()
for ratelimiter in self.ratelimiters.values()
],
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
[],
lambda: count(
bool,
[
ratelimiter.should_reject()
for ratelimiter in self.ratelimiters.values()
],
),
)

def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
"""Used to ratelimit an incoming request from a given host
Expand Down Expand Up @@ -116,6 +145,17 @@ def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]":
finally:
self._on_exit(request_id)

def should_reject(self):
"""
Reject the request if we already have too many queued up (either
sleeping or in the ready queue).
"""
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
return queue_size > self.reject_limit

def should_sleep(self):
return len(self.request_times) > self.sleep_limit

def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
time_now = self.clock.time_msec()

Expand All @@ -126,8 +166,7 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]":

# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
raise LimitExceededError(
Expand Down Expand Up @@ -157,7 +196,7 @@ def queue_request() -> "defer.Deferred[None]":
len(self.request_times),
)

if len(self.request_times) > self.sleep_limit:
if self.should_sleep():
logger.debug(
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
self.host,
Expand Down

0 comments on commit 8be321f

Please sign in to comment.