Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Expose statistics on extrems to prometheus #5384

Merged
merged 16 commits into from
Jun 13, 2019
Merged
1 change: 1 addition & 0 deletions changelog.d/5384.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Statistics on forward extremities per room are now exposed via Prometheus.
2 changes: 1 addition & 1 deletion scripts/generate_signing_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import argparse
import sys

from signedjson.key import write_signing_keys, generate_signing_key
from signedjson.key import generate_signing_key, write_signing_keys

from synapse.util.stringutils import random_string

Expand Down
112 changes: 92 additions & 20 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily

from twisted.internet import reactor

Expand All @@ -40,7 +40,6 @@


class RegistryProxy(object):

@staticmethod
def collect():
for metric in REGISTRY.collect():
Expand All @@ -63,10 +62,7 @@ def collect(self):
try:
calls = self.caller()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)",
self.name,
)
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return

Expand Down Expand Up @@ -116,9 +112,7 @@ def __init__(self, name, desc, labels, sub_metrics):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)

# Counts number of in flight blocks for a given set of label values
Expand Down Expand Up @@ -157,7 +151,9 @@ def collect(self):

Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
in_flight = GaugeMetricFamily(
self.name + "_total", self.desc, labels=self.labels
)

metrics_by_key = {}

Expand All @@ -179,7 +175,9 @@ def collect(self):
yield in_flight

for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
gauge = GaugeMetricFamily(
"_".join([self.name, name]), "", labels=self.labels
)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
Expand All @@ -193,12 +191,75 @@ def _register_with_collector(self):
all_gauges[self.name] = self


@attr.s(hash=True)
class BucketCollector(object):
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.

Args:
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.

"""

hawkowl marked this conversation as resolved.
Show resolved Hide resolved
name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()

def collect(self):

# Fetch the data -- this must be synchronous!
data = self.data_collector()

buckets = {}

res = []
for x in data.keys():
for i, bound in enumerate(self.buckets):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
break

for i in self.buckets:
res.append([i, buckets.get(i, 0)])

res.append(["+Inf", sum(data.values())])

metric = HistogramMetricFamily(
self.name,
"",
buckets=res,
sum_value=sum([x * y for x, y in data.items()]),
)
yield metric

def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")

self.buckets = tuple(self.buckets)

if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
# Detailed CPU metrics
#

class CPUMetrics(object):

class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
Expand Down Expand Up @@ -237,13 +298,28 @@ def collect(self):
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
buckets=[
0.0025,
0.005,
0.01,
0.025,
0.05,
0.10,
0.25,
0.50,
1.00,
2.50,
5.00,
7.50,
15.00,
30.00,
45.00,
60.00,
],
)


class GCCounts(object):

def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
Expand Down Expand Up @@ -279,9 +355,7 @@ def collect(self):
events_processed_counter = Counter("synapse_federation_client_events_processed", "")

event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count",
"Event processing loop iterations",
["name"],
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)

event_processing_loop_room_count = Counter(
Expand Down Expand Up @@ -311,7 +385,6 @@ def collect(self):


class ReactorLastSeenMetric(object):

def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
Expand All @@ -325,7 +398,6 @@ def collect(self):


def runUntilCurrentTimer(func):

@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
Expand Down
44 changes: 31 additions & 13 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import itertools
import logging
from collections import OrderedDict, deque, namedtuple
from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps

from six import iteritems, text_type
Expand All @@ -33,6 +33,7 @@
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
Expand Down Expand Up @@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):

def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)

self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()

# Collect metrics on the number of forward extremities that exist.
self._current_forward_extremities_amount = {}

BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
)

# Read the extrems every 60 minutes
hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)

@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
"""
)
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
return txn.fetchall()

res = yield self.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))

@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Expand Down Expand Up @@ -568,17 +594,11 @@ def _get_events_which_are_prevs_txn(txn, batch):
)

txn.execute(sql, batch)
results.extend(
r[0]
for r in txn
if not json.loads(r[1]).get("soft_failed")
)
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events_which_are_prevs_txn,
chunk,
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)

defer.returnValue(results)
Expand Down Expand Up @@ -640,9 +660,7 @@ def _get_prevs_before_rejected_txn(txn, batch):

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_prevs_before_rejected",
_get_prevs_before_rejected_txn,
chunk,
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)

defer.returnValue(existing_prevs)
Expand Down
Loading