Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Instrument the federation/backfill part of /messages
Browse files Browse the repository at this point in the history
Split out from #13440
  • Loading branch information
MadLittleMods committed Aug 9, 2022
1 parent 1b09b08 commit aeaa36d
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 23 deletions.
19 changes: 18 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -235,6 +235,7 @@ async def claim_client_keys(
)

@trace
@tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
Expand Down Expand Up @@ -337,6 +338,8 @@ async def get_pdu_from_destination_raw(

return None

@trace
@tag_args
async def get_pdu(
self,
destinations: Iterable[str],
Expand Down Expand Up @@ -448,6 +451,8 @@ async def get_pdu(

return event_copy

@trace
@tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
Expand All @@ -467,13 +472,24 @@ async def get_room_state_ids(
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])

set_tag(
SynapseTags.RESULT_PREFIX + f"state_event_ids ({len(state_event_ids)})",
str(state_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX + f"auth_event_ids ({len(auth_event_ids)})",
str(auth_event_ids),
)

if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise InvalidResponseError("invalid response from /state_ids")

return state_event_ids, auth_event_ids

@trace
@tag_args
async def get_room_state(
self,
destination: str,
Expand Down Expand Up @@ -533,6 +549,7 @@ async def get_room_state(

return valid_state_events, valid_auth_events

@trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
Expand Down
7 changes: 6 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import SynapseTags, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -370,6 +370,11 @@ async def _maybe_backfill_inner(
logger.debug(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
set_tag(
SynapseTags.RESULT_PREFIX
+ f"extremities_to_request {len(extremities_to_request)}",
str(extremities_to_request),
)

# Now we need to decide which hosts to hit first.

Expand Down
87 changes: 73 additions & 14 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import (
SynapseTags,
set_tag,
start_active_span,
tag_args,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
Expand Down Expand Up @@ -410,6 +416,7 @@ async def check_join_restrictions(
prev_member_event,
)

@trace
async def process_remote_join(
self,
origin: str,
Expand Down Expand Up @@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu(

@trace
async def _process_pulled_events(
self, origin: str, events: Iterable[EventBase], backfilled: bool
self, origin: str, events: List[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server
Expand All @@ -730,6 +737,11 @@ async def _process_pulled_events(
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})",
str([event.event_id for event in events]),
)
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
Expand All @@ -753,6 +765,7 @@ async def _process_pulled_events(
await self._process_pulled_event(origin, ev, backfilled=backfilled)

@trace
@tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
Expand Down Expand Up @@ -854,6 +867,7 @@ async def _process_pulled_event(
else:
raise

@trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
Expand Down Expand Up @@ -970,6 +984,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
event, state_ids_before_event=state_map, partial_state=partial_state
)

@trace
@tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
Expand Down Expand Up @@ -1009,10 +1025,10 @@ async def _get_state_ids_after_missing_prev_event(
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)

missing_desired_events = desired_events - have_events
missing_desired_event_ids = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
len(missing_desired_events),
len(missing_desired_event_ids),
len(have_events),
)

Expand All @@ -1024,13 +1040,24 @@ async def _get_state_ids_after_missing_prev_event(
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.

missing_auth_events = set(auth_event_ids) - have_events
missing_auth_events.difference_update(
await self._store.have_seen_events(room_id, missing_auth_events)
missing_auth_event_ids = set(auth_event_ids) - have_events
missing_auth_event_ids.difference_update(
await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))
logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))

missing_event_ids = missing_desired_event_ids | missing_auth_event_ids

missing_events = missing_desired_events | missing_auth_events
set_tag(
SynapseTags.RESULT_PREFIX
+ f"missing_auth_event_ids ({len(missing_auth_event_ids)})",
str(missing_auth_event_ids),
)
set_tag(
SynapseTags.RESULT_PREFIX
+ f"missing_desired_event_ids ({len(missing_desired_event_ids)})",
str(missing_desired_event_ids),
)

# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
Expand All @@ -1041,13 +1068,13 @@ async def _get_state_ids_after_missing_prev_event(
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
logger.debug("Fetching %i events from remote", len(missing_events))
logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_events
destination=destination, room_id=room_id, event_ids=missing_event_ids
)

# We now need to fill out the state map, which involves fetching the
Expand Down Expand Up @@ -1104,6 +1131,10 @@ async def _get_state_ids_after_missing_prev_event(
event_id,
failed_to_fetch,
)
set_tag(
SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})",
str(failed_to_fetch),
)

if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
Expand All @@ -1112,6 +1143,8 @@ async def _get_state_ids_after_missing_prev_event(

return state_map

@trace
@tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand All @@ -1133,6 +1166,7 @@ async def _get_state_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)

@trace
async def _process_received_pdu(
self,
origin: str,
Expand Down Expand Up @@ -1283,6 +1317,7 @@ async def _resync_device(self, sender: str) -> None:
except Exception:
logger.exception("Failed to resync device for %s", sender)

@trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
Expand Down Expand Up @@ -1414,6 +1449,8 @@ async def backfill_event_id(

return event_from_response

@trace
@tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
Expand Down Expand Up @@ -1459,6 +1496,7 @@ async def get_event(event_id: str) -> None:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)

@trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
Expand All @@ -1477,6 +1515,12 @@ async def _auth_and_persist_outliers(
"""
event_map = {event.event_id: event for event in events}

event_ids = event_map.keys()
set_tag(
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})",
str(event_ids),
)

# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
Expand Down Expand Up @@ -1593,6 +1637,7 @@ async def prep(event: EventBase) -> None:
backfilled=True,
)

@trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
Expand Down Expand Up @@ -1631,6 +1676,11 @@ async def _check_event_auth(
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
set_tag(
SynapseTags.RESULT_PREFIX
+ f"claimed_auth_events ({len(claimed_auth_events)})",
str([ev.event_id for ev in claimed_auth_events]),
)

# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
Expand Down Expand Up @@ -1728,6 +1778,7 @@ async def _check_event_auth(
)
context.rejected = RejectedReason.AUTH_ERROR

@trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
Expand Down Expand Up @@ -1935,6 +1986,8 @@ async def _load_or_fetch_auth_events_for_event(
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")

@trace
@tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
Expand Down Expand Up @@ -1963,6 +2016,7 @@ async def _get_remote_auth_chain_for_event(

await self._auth_and_persist_outliers(room_id, remote_auth_events)

@trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
Expand Down Expand Up @@ -2071,8 +2125,13 @@ async def persist_events_and_notify(
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
for event in events:
await self._notify_persisted_event(event, max_stream_token)
with start_active_span("notify_persisted_events"):
set_tag(
SynapseTags.RESULT_PREFIX + f"event_ids ({len(events)})",
str([ev.event_id for ev in events]),
)
for event in events:
await self._notify_persisted_event(event, max_stream_token)

return max_stream_token.stream

Expand Down
13 changes: 13 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,19 @@ class SynapseTags:
# The name of the external cache
CACHE_NAME = "cache.name"

# Used to tag function arguments
#
# Tag a named arg. The name of the argument should be appended to this
# prefix
FUNC_ARG_PREFIX = "ARG."
# Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
FUNC_ARGS = "args"
# Tag keyword args
FUNC_KWARGS = "kwargs"

# Some intermediate result that's interesting to the function
RESULT_PREFIX = "RESULT."


class SynapseBaggage:
FORCE_TRACING = "synapse-force-tracing"
Expand Down
Loading

0 comments on commit aeaa36d

Please sign in to comment.