Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add an improved "forward extremities" metric #8425

Merged
merged 4 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8425.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom.
115 changes: 68 additions & 47 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import functools
import gc
import itertools
import logging
import os
import platform
Expand All @@ -27,8 +28,8 @@
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
)

from twisted.internet import reactor
Expand All @@ -46,7 +47,7 @@
METRICS_PREFIX = "/_synapse/metrics"

running_on_pypy = platform.python_implementation() == "PyPy"
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")

Expand Down Expand Up @@ -205,63 +206,83 @@ def _register_with_collector(self):
all_gauges[self.name] = self


@attr.s(slots=True, hash=True)
class BucketCollector:
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
class GaugeBucketCollector:
"""Like a Histogram, but the buckets are Gauges which are updated atomically.

Args:
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.
The data is updated by calling `update_data` with an iterable of measurements.

We assume that the data is updated less frequently than it is reported to
Prometheus, and optimise for that case.
"""

name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()
__slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")

def collect(self):
def __init__(
self,
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
):
"""
Args:
name: base name of metric to be exported to Prometheus. (a _bucket suffix
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
"""
self._name = name
self._documentation = documentation

# Fetch the data -- this must be synchronous!
data = self.data_collector()
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
if self._bucket_bounds != sorted(self._bucket_bounds):
raise ValueError("Buckets not in sorted order")

buckets = {} # type: Dict[float, int]
if self._bucket_bounds[-1] != float("inf"):
self._bucket_bounds.append(float("inf"))

res = []
for x in data.keys():
for i, bound in enumerate(self.buckets):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
self._metric = self._values_to_metric([])
registry.register(self)

for i in self.buckets:
res.append([str(i), buckets.get(i, 0)])
def collect(self):
yield self._metric

res.append(["+Inf", sum(data.values())])
def update_data(self, values: Iterable[float]):
"""Update the data to be reported by the metric

metric = HistogramMetricFamily(
self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
"""
self._metric = self._values_to_metric(values)

def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]

for v in values:
# assign each value to a bucket
for i, bound in enumerate(self._bucket_bounds):
if v <= bound:
bucket_values[i] += 1
break

# ... and increment the sum
total += v

# now, aggregate the bucket values so that they count the number of entries in
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)

return GaugeHistogramMetricFamily(
self._name,
self._documentation,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
gsum_value=total,
)
yield metric

def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")

self.buckets = tuple(self.buckets)

if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
Expand Down
40 changes: 29 additions & 11 deletions synapse/metrics/_exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, List
from urllib.parse import parse_qs, urlparse

from prometheus_client import REGISTRY
Expand Down Expand Up @@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples

om_samples = {} # type: Dict[str, List[str]]
for s in metric.samples:
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
# (these come from gaugehistograms which don't get renamed,
# so no need to faff with mnewname)
om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
break
else:
newname = sample.name.replace(mnewname, mname)
newname = s.name.replace(mnewname, mname)
if ":" in newname and newname.endswith("_total"):
newname = newname[: -len("_total")]
output.append(sample_line(sample, newname))
output.append(sample_line(s, newname))

for suffix, lines in sorted(om_samples.items()):
if emit_help:
output.append(
"# HELP {0}{1} {2}\n".format(
metric.name,
suffix,
metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
)
)
output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
output.extend(lines)

# Get rid of the weird colon things while we're at it
if mtype == "counter":
Expand All @@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples

for s in metric.samples:
# Get rid of the OpenMetrics specific samples (we should already have
# dealt with them above anyway.)
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
break
else:
output.append(
sample_line(
sample, sample.name.replace(":total", "").replace(":", "_")
)
sample_line(s, s.name.replace(":total", "").replace(":", "_"))
)

return "".join(output).encode("utf-8")
Expand Down
53 changes: 35 additions & 18 deletions synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,35 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from collections import Counter

from synapse.metrics import BucketCollector
from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)

# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
" or fewer",
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we ever expect there to be significantly more than 10 forward extremities (or whatever the dummy threshold is)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, this is what we have today...

There are a few reasons we might end up with more:

)

# we also expose metrics on the "number of excess extremity events", which is
# (E-1)*N, where E is the number of extremities and N is the number of state
# events in the room. This is an approximation to the number of state events
# we could remove from state resolution by reducing the graph to a single
# forward extremity.
_excess_state_events_collecter = GaugeBucketCollector(
"synapse_excess_extremity_events",
"Number of rooms on the server with the given number of excess extremity "
"events, or fewer",
buckets=[0] + [1 << n for n in range(12)],
)


class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home
Expand All @@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

# Collect metrics on the number of forward extremities that exist.
# Counter of number of extremities to count
self._current_forward_extremities_amount = (
Counter()
) # type: typing.Counter[int]

BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
)

# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
Expand All @@ -58,14 +64,25 @@ async def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
SELECT t1.c, t2.c
FROM (
SELECT room_id, COUNT(*) c FROM event_forward_extremities
GROUP BY room_id
) t1 LEFT JOIN (
SELECT room_id, COUNT(*) c FROM current_state_events
GROUP BY room_id
) t2 ON t1.room_id = t2.room_id
"""
)
return txn.fetchall()

res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = Counter([x[0] for x in res])

_extremities_collecter.update_data(x[0] for x in res)

_excess_state_events_collecter.update_data(
(x[0] - 1) * x[1] for x in res if x[1]
)

async def count_daily_messages(self):
"""
Expand Down
19 changes: 11 additions & 8 deletions tests/storage/test_event_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def test_exposed_to_prometheus(self):
self.reactor.advance(60 * 60 * 1000)
self.pump(1)

items = set(
items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x,
generate_latest(REGISTRY).split(b"\n"),
generate_latest(REGISTRY, emit_help=False).split(b"\n"),
)
)

expected = {
expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
Expand All @@ -72,9 +72,12 @@ def test_exposed_to_prometheus(self):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
b"synapse_forward_extremities_count 3.0",
b"synapse_forward_extremities_sum 10.0",
}

# per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
# "inf" is valid: "this includes variants such as inf"
b'synapse_forward_extremities_bucket{le="inf"} 3.0',
b"# TYPE synapse_forward_extremities_gcount gauge",
b"synapse_forward_extremities_gcount 3.0",
b"# TYPE synapse_forward_extremities_gsum gauge",
b"synapse_forward_extremities_gsum 10.0",
]
self.assertEqual(items, expected)