From a99658074dc3b2b0f6abcb4f98d56bc1386398aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jun 2020 16:58:06 +0100 Subject: [PATCH] Add some metrics for inbound and outbound federation processing times (#7755) --- changelog.d/7755.misc | 1 + synapse/federation/federation_server.py | 37 ++++++++++++++----------- synapse/federation/sender/__init__.py | 10 ++++++- synapse/handlers/appservice.py | 6 ++++ synapse/metrics/__init__.py | 6 ++++ 5 files changed, 43 insertions(+), 17 deletions(-) create mode 100644 changelog.d/7755.misc diff --git a/changelog.d/7755.misc b/changelog.d/7755.misc new file mode 100644 index 000000000000..1fc29206ac5d --- /dev/null +++ b/changelog.d/7755.misc @@ -0,0 +1 @@ +Add some metrics for inbound and outbound federation latencies: `synapse_federation_server_pdu_process_time` and `synapse_event_processing_lag_by_event`. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index afe0a8238bdc..e704cf2f4437 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -18,7 +18,7 @@ from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer from twisted.internet.abstract import isIPAddress @@ -70,6 +70,10 @@ "synapse_federation_server_received_queries", "", ["type"] ) +pdu_process_time = Histogram( + "synapse_federation_server_pdu_process_time", "Time taken to process an event", +) + class FederationServer(FederationBase): def __init__(self, hs): @@ -271,21 +275,22 @@ async def process_pdus_for_room(room_id: str): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), - ) + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + pdu_results[event_id] = {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 5b8faea4e72e..23fb5156834f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -201,7 +201,15 @@ async def handle_event(event: EventBase) -> None: logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + if destinations: + self._send_pdu(event, destinations) + + now = self.clock.time_msec() + ts = await self.store.get_received_ts(event.event_id) + + synapse.metrics.event_processing_lag_by_event.labels( + "federation_sender" + ).observe(now - ts) async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ac1b64caff2a..f7d9fd621ef2 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -114,6 +114,12 @@ def start_scheduler(): for service in services: self.scheduler.submit_event_for_as(service, event) + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(event.event_id) + synapse.metrics.event_processing_lag_by_event.labels( + "appservice_sender" + ).observe(now - ts) + @defer.inlineCallbacks def handle_room_events(events): for event in events: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 087a49d65df7..6035672698bd 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -463,6 +463,12 @@ def collect(self): # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +event_processing_lag_by_event = Histogram( + "synapse_event_processing_lag_by_event", + "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", + ["name"], +) + # Build info of the running server. build_info = Gauge( "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]