Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Update get_pdu to return the original, pristine EventBase (#13320)
Browse files Browse the repository at this point in the history
Update `get_pdu` to return the untouched, pristine `EventBase` as it was originally seen over federation (no metadata added). Previously, we returned the same `event` reference that we stored in the cache which downstream code modified in place and added metadata like setting it as an `outlier`  and essentially poisoned our cache. Now we always return a copy of the `event` so the original can stay pristine in our cache and re-used for the next cache call.

Split out from #13205

As discussed at:

 - #13205 (comment)
 - #13205 (comment)

Related to #12584. This PR doesn't fix that issue because it hits [`get_event` which exists from the local database before it tries to `get_pdu`](https://github.com/matrix-org/synapse/blob/7864f33e286dec22368dc0b11c06eebb1462a51e/synapse/federation/federation_client.py#L581-L594).
  • Loading branch information
MadLittleMods authored Jul 20, 2022
1 parent a1b62af commit 0f971ca
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 61 deletions.
1 change: 1 addition & 0 deletions changelog.d/13320.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `FederationClient.get_pdu()` returning events from the cache as `outliers` instead of original events we saw over federation.
123 changes: 81 additions & 42 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
RoomVersion,
RoomVersions,
)
from synapse.events import EventBase, builder
from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
Expand Down Expand Up @@ -299,7 +299,8 @@ async def get_pdu_from_destination_raw(
moving to the next destination. None indicates no timeout.
Returns:
The requested PDU, or None if we were unable to find it.
A copy of the requested PDU that is safe to modify, or None if we
were unable to find it.
Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
Expand All @@ -309,7 +310,7 @@ async def get_pdu_from_destination_raw(
)

logger.debug(
"retrieved event id %s from %s: %r",
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
Expand Down Expand Up @@ -358,54 +359,92 @@ async def get_pdu(
The requested PDU, or None if we were unable to find it.
"""

# TODO: Rate limit the number of times we try and get the same event.
logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations
)

ev = self._get_pdu_cache.get(event_id)
if ev:
return ev
# TODO: Rate limit the number of times we try and get the same event.

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
# We might need the same event multiple times in quick succession (before
# it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches
# events once they have been persisted.
event = self._get_pdu_cache.get(event_id)

# If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations
if not event:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
destination,
last_attempt,
PDU_RETRY_TIME_MS,
now,
)
continue

try:
event = await self.get_pdu_from_destination_raw(
destination=destination,
event_id=event_id,
room_version=room_version,
timeout=timeout,
)

signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
continue
pdu_attempts[destination] = now

try:
signed_pdu = await self.get_pdu_from_destination_raw(
destination=destination,
event_id=event_id,
room_version=room_version,
timeout=timeout,
)
if event:
# Prime the cache
self._get_pdu_cache[event.event_id] = event

pdu_attempts[destination] = now
# FIXME: We should add a `break` here to avoid calling every
# destination after we already found a PDU (will follow-up
# in a separate PR)

except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now
except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id,
destination,
e,
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now

logger.info(
"Failed to get PDU %s from %s because %s",
event_id,
destination,
e,
)
continue

logger.info(
"Failed to get PDU %s from %s because %s", event_id, destination, e
)
continue
if not event:
return None

if signed_pdu:
self._get_pdu_cache[event_id] = signed_pdu
# `event` now refers to an object stored in `get_pdu_cache`. Our
# callers may need to modify the returned object (eg to set
# `event.internal_metadata.outlier = true`), so we return a copy
# rather than the original object.
event_copy = make_event_from_dict(
event.get_pdu_json(),
event.room_version,
)

return signed_pdu
return event_copy

async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
Expand Down
22 changes: 18 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,24 @@ async def _process_pulled_event(
"""
logger.info("Processing pulled event %s", event)

# these should not be outliers.
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"
# This function should not be used to persist outliers (use something
# else) because this does a bunch of operations that aren't necessary
# (extra work; in particular, it makes sure we have all the prev_events
# and resolves the state across those prev events). If you happen to run
# into a situation where the event you're trying to process/backfill is
# marked as an `outlier`, then you should update that spot to return an
# `EventBase` copy that doesn't have `outlier` flag set.
#
# `EventBase` is used to represent both an event we have not yet
# persisted, and one that we have persisted and now keep in the cache.
# In an ideal world this method would only be called with the first type
# of event, but it turns out that's not actually the case and for
# example, you could get an event from cache that is marked as an
# `outlier` (fix up that spot though).
assert not event.internal_metadata.is_outlier(), (
"Outlier event passed to _process_pulled_event. "
"To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`."
)

event_id = event.event_id

Expand Down
23 changes: 20 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,24 @@ def _update_outliers_txn(
event_id: outlier for event_id, outlier in txn
}

logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
[ev.event_id for ev, _ in events_and_contexts],
have_persisted,
)

to_remove = set()
for event, context in events_and_contexts:
if event.event_id not in have_persisted:
outlier_persisted = have_persisted.get(event.event_id)
logger.debug(
"_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
event.event_id,
event.internal_metadata.is_outlier(),
outlier_persisted,
)

# Ignore events which we haven't persisted at all
if outlier_persisted is None:
continue

to_remove.add(event)
Expand All @@ -1358,7 +1373,6 @@ def _update_outliers_txn(
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that event
Expand All @@ -1369,7 +1383,10 @@ def _update_outliers_txn(
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
event.event_id,
)

# insert into event_to_state_groups.
try:
Expand Down
Loading

0 comments on commit 0f971ca

Please sign in to comment.