-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix rate limit metrics registering twice and misreporting #13649
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix rate limit gauge metrics registering twice and misreporting (`synapse_rate_limit_sleep_affected_hosts`, `synapse_rate_limit_reject_affected_hosts`). |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,23 @@ | |
import collections | ||
import contextlib | ||
import logging | ||
import threading | ||
import typing | ||
from typing import Any, DefaultDict, Iterator, List, Set | ||
from typing import ( | ||
Any, | ||
Callable, | ||
DefaultDict, | ||
Dict, | ||
Iterator, | ||
List, | ||
Mapping, | ||
Optional, | ||
Set, | ||
Tuple, | ||
) | ||
|
||
from prometheus_client.core import Counter | ||
from typing_extensions import ContextManager | ||
|
||
from twisted.internet import defer | ||
|
||
|
@@ -40,12 +53,20 @@ | |
|
||
|
||
# Track how much the ratelimiter is affecting requests | ||
rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "") | ||
rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "") | ||
rate_limit_sleep_counter = Counter( | ||
"synapse_rate_limit_sleep", | ||
"Number of requests slept by the rate limiter", | ||
["rate_limiter_name"], | ||
) | ||
rate_limit_reject_counter = Counter( | ||
"synapse_rate_limit_reject", | ||
"Number of requests rejected by the rate limiter", | ||
["rate_limiter_name"], | ||
) | ||
queue_wait_timer = Histogram( | ||
"synapse_rate_limit_queue_wait_time_seconds", | ||
"sec", | ||
[], | ||
"Amount of time spent waiting for the rate limiter to let our request through.", | ||
["rate_limiter_name"], | ||
buckets=( | ||
0.005, | ||
0.01, | ||
|
@@ -65,35 +86,92 @@ | |
) | ||
|
||
|
||
_rate_limiter_instances: Set["FederationRateLimiter"] = set() | ||
# Protects the _rate_limiter_instances set from concurrent access | ||
_rate_limiter_instances_lock = threading.Lock() | ||
|
||
|
||
def _get_counts_from_rate_limiter_instance( | ||
count_func: Callable[["FederationRateLimiter"], int] | ||
) -> Mapping[Tuple[str, ...], int]: | ||
"""Returns a count of something (slept/rejected hosts) by (metrics_name)""" | ||
# Cast to a list to prevent it changing while the Prometheus | ||
# thread is collecting metrics | ||
with _rate_limiter_instances_lock: | ||
rate_limiter_instances = list(_rate_limiter_instances) | ||
|
||
# Map from (metrics_name,) -> int, the number of something like slept hosts | ||
# or rejected hosts. The key type is Tuple[str], but we leave the length | ||
# unspecified for compatability with LaterGauge's annotations. | ||
counts: Dict[Tuple[str, ...], int] = {} | ||
for rate_limiter_instance in rate_limiter_instances: | ||
# Only track metrics if they provided a `metrics_name` to | ||
# differentiate this instance of the rate limiter. | ||
if rate_limiter_instance.metrics_name: | ||
key = (rate_limiter_instance.metrics_name,) | ||
counts[key] = count_func(rate_limiter_instance) | ||
|
||
return counts | ||
|
||
|
||
# We track the number of affected hosts per time-period so we can | ||
# differentiate one really noisy homeserver from a general | ||
# ratelimit tuning problem across the federation. | ||
LaterGauge( | ||
"synapse_rate_limit_sleep_affected_hosts", | ||
"Number of hosts that had requests put to sleep", | ||
["rate_limiter_name"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made the label |
||
lambda: _get_counts_from_rate_limiter_instance( | ||
lambda rate_limiter_instance: sum( | ||
ratelimiter.should_sleep() | ||
for ratelimiter in rate_limiter_instance.ratelimiters.values() | ||
) | ||
), | ||
) | ||
LaterGauge( | ||
"synapse_rate_limit_reject_affected_hosts", | ||
"Number of hosts that had requests rejected", | ||
["rate_limiter_name"], | ||
lambda: _get_counts_from_rate_limiter_instance( | ||
lambda rate_limiter_instance: sum( | ||
ratelimiter.should_reject() | ||
for ratelimiter in rate_limiter_instance.ratelimiters.values() | ||
) | ||
), | ||
) | ||
|
||
|
||
class FederationRateLimiter: | ||
def __init__(self, clock: Clock, config: FederationRatelimitSettings): | ||
"""Used to rate limit request per-host.""" | ||
|
||
def __init__( | ||
self, | ||
clock: Clock, | ||
config: FederationRatelimitSettings, | ||
metrics_name: Optional[str] = None, | ||
): | ||
""" | ||
Args: | ||
clock | ||
config | ||
metrics_name: The name of the rate limiter so we can differentiate it | ||
from the rest in the metrics. If `None`, we don't track metrics | ||
for this rate limiter. | ||
|
||
""" | ||
self.metrics_name = metrics_name | ||
|
||
def new_limiter() -> "_PerHostRatelimiter": | ||
return _PerHostRatelimiter(clock=clock, config=config) | ||
return _PerHostRatelimiter( | ||
clock=clock, config=config, metrics_name=metrics_name | ||
) | ||
|
||
self.ratelimiters: DefaultDict[ | ||
str, "_PerHostRatelimiter" | ||
] = collections.defaultdict(new_limiter) | ||
|
||
# We track the number of affected hosts per time-period so we can | ||
# differentiate one really noisy homeserver from a general | ||
# ratelimit tuning problem across the federation. | ||
LaterGauge( | ||
"synapse_rate_limit_sleep_affected_hosts", | ||
"Number of hosts that had requests put to sleep", | ||
[], | ||
lambda: sum( | ||
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values() | ||
), | ||
) | ||
LaterGauge( | ||
"synapse_rate_limit_reject_affected_hosts", | ||
"Number of hosts that had requests rejected", | ||
[], | ||
lambda: sum( | ||
ratelimiter.should_reject() | ||
for ratelimiter in self.ratelimiters.values() | ||
), | ||
) | ||
with _rate_limiter_instances_lock: | ||
_rate_limiter_instances.add(self) | ||
|
||
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]": | ||
"""Used to ratelimit an incoming request from a given host | ||
|
@@ -114,13 +192,23 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None] | |
|
||
|
||
class _PerHostRatelimiter: | ||
def __init__(self, clock: Clock, config: FederationRatelimitSettings): | ||
def __init__( | ||
self, | ||
clock: Clock, | ||
config: FederationRatelimitSettings, | ||
metrics_name: Optional[str] = None, | ||
): | ||
""" | ||
Args: | ||
clock | ||
config | ||
metrics_name: The name of the rate limiter so we can differentiate it | ||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from the rest in the metrics. If `None`, we don't track metrics | ||
for this rate limiter. | ||
from the rest in the metrics | ||
""" | ||
self.clock = clock | ||
self.metrics_name = metrics_name | ||
|
||
self.window_size = config.window_size | ||
self.sleep_limit = config.sleep_limit | ||
|
@@ -178,7 +266,10 @@ def should_sleep(self) -> bool: | |
return len(self.request_times) > self.sleep_limit | ||
|
||
async def _on_enter_with_tracing(self, request_id: object) -> None: | ||
with start_active_span("ratelimit wait"), queue_wait_timer.time(): | ||
maybe_metrics_cm: ContextManager = contextlib.nullcontext() | ||
if self.metrics_name: | ||
maybe_metrics_cm = queue_wait_timer.labels(self.metrics_name).time() | ||
with start_active_span("ratelimit wait"), maybe_metrics_cm: | ||
Comment on lines
+269
to
+272
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slightly worried about I've made We could make |
||
await self._on_enter(request_id) | ||
|
||
def _on_enter(self, request_id: object) -> "defer.Deferred[None]": | ||
|
@@ -193,7 +284,8 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]": | |
# sleeping or in the ready queue). | ||
if self.should_reject(): | ||
logger.debug("Ratelimiter(%s): rejecting request", self.host) | ||
rate_limit_reject_counter.inc() | ||
if self.metrics_name: | ||
rate_limit_reject_counter.labels(self.metrics_name).inc() | ||
raise LimitExceededError( | ||
retry_after_ms=int(self.window_size / self.sleep_limit) | ||
) | ||
|
@@ -228,7 +320,8 @@ def queue_request() -> "defer.Deferred[None]": | |
id(request_id), | ||
self.sleep_sec, | ||
) | ||
rate_limit_sleep_counter.inc() | ||
if self.metrics_name: | ||
rate_limit_sleep_counter.labels(self.metrics_name).inc() | ||
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) | ||
|
||
self.sleeping_requests.add(request_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same pattern as
synapse/synapse/http/request_metrics.py
Lines 110 to 142 in c4e29b6