Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8425 from matrix-org/rav/extremity_metrics
Browse files Browse the repository at this point in the history
Add an improved "forward extremities" metric
  • Loading branch information
richvdh authored Sep 30, 2020
2 parents 8b40843 + 32acab3 commit a0a1ba6
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 84 deletions.
1 change: 1 addition & 0 deletions changelog.d/8425.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom.
115 changes: 68 additions & 47 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import functools
import gc
import itertools
import logging
import os
import platform
Expand All @@ -27,8 +28,8 @@
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
)

from twisted.internet import reactor
Expand All @@ -46,7 +47,7 @@
METRICS_PREFIX = "/_synapse/metrics"

running_on_pypy = platform.python_implementation() == "PyPy"
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")

Expand Down Expand Up @@ -205,63 +206,83 @@ def _register_with_collector(self):
all_gauges[self.name] = self


@attr.s(slots=True, hash=True)
class BucketCollector:
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
class GaugeBucketCollector:
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
Args:
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.
The data is updated by calling `update_data` with an iterable of measurements.
We assume that the data is updated less frequently than it is reported to
Prometheus, and optimise for that case.
"""

name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()
__slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")

def collect(self):
def __init__(
self,
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
):
"""
Args:
name: base name of metric to be exported to Prometheus. (a _bucket suffix
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
"""
self._name = name
self._documentation = documentation

# Fetch the data -- this must be synchronous!
data = self.data_collector()
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
if self._bucket_bounds != sorted(self._bucket_bounds):
raise ValueError("Buckets not in sorted order")

buckets = {} # type: Dict[float, int]
if self._bucket_bounds[-1] != float("inf"):
self._bucket_bounds.append(float("inf"))

res = []
for x in data.keys():
for i, bound in enumerate(self.buckets):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
self._metric = self._values_to_metric([])
registry.register(self)

for i in self.buckets:
res.append([str(i), buckets.get(i, 0)])
def collect(self):
yield self._metric

res.append(["+Inf", sum(data.values())])
def update_data(self, values: Iterable[float]):
"""Update the data to be reported by the metric
metric = HistogramMetricFamily(
self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
"""
self._metric = self._values_to_metric(values)

def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]

for v in values:
# assign each value to a bucket
for i, bound in enumerate(self._bucket_bounds):
if v <= bound:
bucket_values[i] += 1
break

# ... and increment the sum
total += v

# now, aggregate the bucket values so that they count the number of entries in
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)

return GaugeHistogramMetricFamily(
self._name,
self._documentation,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
gsum_value=total,
)
yield metric

def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")

self.buckets = tuple(self.buckets)

if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
Expand Down
40 changes: 29 additions & 11 deletions synapse/metrics/_exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from typing import Dict, List
from urllib.parse import parse_qs, urlparse

from prometheus_client import REGISTRY
Expand Down Expand Up @@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples

om_samples = {} # type: Dict[str, List[str]]
for s in metric.samples:
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
# (these come from gaugehistograms which don't get renamed,
# so no need to faff with mnewname)
om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
break
else:
newname = sample.name.replace(mnewname, mname)
newname = s.name.replace(mnewname, mname)
if ":" in newname and newname.endswith("_total"):
newname = newname[: -len("_total")]
output.append(sample_line(sample, newname))
output.append(sample_line(s, newname))

for suffix, lines in sorted(om_samples.items()):
if emit_help:
output.append(
"# HELP {0}{1} {2}\n".format(
metric.name,
suffix,
metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
)
)
output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
output.extend(lines)

# Get rid of the weird colon things while we're at it
if mtype == "counter":
Expand All @@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
for sample in metric.samples:
# Get rid of the OpenMetrics specific samples

for s in metric.samples:
# Get rid of the OpenMetrics specific samples (we should already have
# dealt with them above anyway.)
for suffix in ["_created", "_gsum", "_gcount"]:
if sample.name.endswith(suffix):
if s.name == metric.name + suffix:
break
else:
output.append(
sample_line(
sample, sample.name.replace(":total", "").replace(":", "_")
)
sample_line(s, s.name.replace(":total", "").replace(":", "_"))
)

return "".join(output).encode("utf-8")
Expand Down
53 changes: 35 additions & 18 deletions synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,35 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from collections import Counter

from synapse.metrics import BucketCollector
from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)

# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
" or fewer",
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
)

# we also expose metrics on the "number of excess extremity events", which is
# (E-1)*N, where E is the number of extremities and N is the number of state
# events in the room. This is an approximation to the number of state events
# we could remove from state resolution by reducing the graph to a single
# forward extremity.
_excess_state_events_collecter = GaugeBucketCollector(
"synapse_excess_extremity_events",
"Number of rooms on the server with the given number of excess extremity "
"events, or fewer",
buckets=[0] + [1 << n for n in range(12)],
)


class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home
Expand All @@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

# Collect metrics on the number of forward extremities that exist.
# Counter of number of extremities to count
self._current_forward_extremities_amount = (
Counter()
) # type: typing.Counter[int]

BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
)

# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
Expand All @@ -58,14 +64,25 @@ async def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
SELECT t1.c, t2.c
FROM (
SELECT room_id, COUNT(*) c FROM event_forward_extremities
GROUP BY room_id
) t1 LEFT JOIN (
SELECT room_id, COUNT(*) c FROM current_state_events
GROUP BY room_id
) t2 ON t1.room_id = t2.room_id
"""
)
return txn.fetchall()

res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = Counter([x[0] for x in res])

_extremities_collecter.update_data(x[0] for x in res)

_excess_state_events_collecter.update_data(
(x[0] - 1) * x[1] for x in res if x[1]
)

async def count_daily_messages(self):
"""
Expand Down
19 changes: 11 additions & 8 deletions tests/storage/test_event_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def test_exposed_to_prometheus(self):
self.reactor.advance(60 * 60 * 1000)
self.pump(1)

items = set(
items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x,
generate_latest(REGISTRY).split(b"\n"),
generate_latest(REGISTRY, emit_help=False).split(b"\n"),
)
)

expected = {
expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
Expand All @@ -72,9 +72,12 @@ def test_exposed_to_prometheus(self):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
b"synapse_forward_extremities_count 3.0",
b"synapse_forward_extremities_sum 10.0",
}

# per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
# "inf" is valid: "this includes variants such as inf"
b'synapse_forward_extremities_bucket{le="inf"} 3.0',
b"# TYPE synapse_forward_extremities_gcount gauge",
b"synapse_forward_extremities_gcount 3.0",
b"# TYPE synapse_forward_extremities_gsum gauge",
b"synapse_forward_extremities_gsum 10.0",
]
self.assertEqual(items, expected)

0 comments on commit a0a1ba6

Please sign in to comment.