Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add some metrics for inbound and outbound federation processing times (
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jun 30, 2020
1 parent 2f6afdd commit a996580
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.d/7755.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some metrics for inbound and outbound federation latencies: `synapse_federation_server_pdu_process_time` and `synapse_event_processing_lag_by_event`.
37 changes: 21 additions & 16 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union

from canonicaljson import json
from prometheus_client import Counter
from prometheus_client import Counter, Histogram

from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
Expand Down Expand Up @@ -70,6 +70,10 @@
"synapse_federation_server_received_queries", "", ["type"]
)

pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time", "Time taken to process an event",
)


class FederationServer(FederationBase):
def __init__(self, hs):
Expand Down Expand Up @@ -271,21 +275,22 @@ async def process_pdus_for_room(room_id: str):

for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)

await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
Expand Down
10 changes: 9 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,15 @@ async def handle_event(event: EventBase) -> None:

logger.debug("Sending %s to %r", event, destinations)

self._send_pdu(event, destinations)
if destinations:
self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)

synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
).observe(now - ts)

async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def start_scheduler():
for service in services:
self.scheduler.submit_event_for_as(service, event)

now = self.clock.time_msec()
ts = yield self.store.get_received_ts(event.event_id)
synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
).observe(now - ts)

@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
Expand Down
6 changes: 6 additions & 0 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ def collect(self):
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])

event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
["name"],
)

# Build info of the running server.
build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
Expand Down

0 comments on commit a996580

Please sign in to comment.