Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Avoid constant missing prev_event fetching while backfilling
Browse files Browse the repository at this point in the history
Persist backfilled event response from oldest -> newest to avoid
having to go fetch missing prev_events which de-outliers every
other event and screws up the stream_ordering. Missing prev_events
aren't fetched as "backfilled" so the stream_ordering was incrementing.

This helps us in MSC2716 land where we can more easily copy a similar
stream_ordering that the originating homeserver has.
  • Loading branch information
MadLittleMods committed Oct 19, 2021
1 parent 438e222 commit 4983739
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 10 deletions.
32 changes: 31 additions & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,37 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
)

await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
# The /backfill response should start from `?v` and include the
# events that preceded it (so the list will be newest -> oldest). We
# reverse that order so the messages are oldest -> newest and we can
# persist the backfilled events without constantly have to go fetch
# missing prev_events which are probably included in the same
# backfill chunk.
reversed(events),
backfilled=True,
)

for ev in events:
event_after_persisted = await self._store.get_event(
ev.event_id, allow_none=True
)

if event_after_persisted:
logger.info(
"from remote server: processed backfilled event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
ev.event_id,
event_after_persisted["type"],
event_after_persisted["depth"],
event_after_persisted.internal_metadata.stream_ordering,
event_after_persisted["content"].get("body", None),
)
else:
logger.info(
"from remote server: processed backfilled event_id=%s failed to lookup",
ev.event_id,
)

async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ async def on_GET(
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
logger.info("/messages rest start pagination_config=%s", pagination_config)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
Expand All @@ -585,6 +586,7 @@ async def on_GET(
event_filter=event_filter,
)

logger.info("/messages rest end msgs=%s", msgs)
return 200, msgs


Expand Down
16 changes: 7 additions & 9 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,13 +1141,14 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
connected_insertion_event_depth = row[0]
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event = row[2]
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event,
if connected_insertion_event not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event,
)
)
)

# Find any batch connections for the given insertion event
txn.execute(
Expand All @@ -1169,9 +1170,6 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

# TODO: Find out why stream_ordering is all out of order compared to
# when we persisted the events

# TODO: We should probably skip adding the event itself if we
# branched off onto the insertion event first above. Need to make this a
# bit smart so it doesn't skip over the event altogether if we're at
Expand Down
8 changes: 8 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ async def _persist_events_and_state_updates(

async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
logger.info(
"_persist_events_and_state_updates backfilled=%s event_id=%s depth=%s stream_ordering=%s content=%s",
backfilled,
event.event_id,
event.depth,
stream,
event["content"].get("body", None),
)
event.internal_metadata.stream_ordering = stream

await self.db_pool.runInteraction(
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ def _paginate_room_events_txn(
"order": order,
}

logger.info("stream: getting events sql=%s args=%s", sql, args)
txn.execute(sql, args)

# Filter the result set.
Expand Down Expand Up @@ -1236,6 +1237,7 @@ async def paginate_room_events(
event_filter,
)

logger.info("paginate_room_events event_ids(%d)=%s", len(rows), [r.event_id for r in rows])
events = await self.get_events_as_list(
[r.event_id for r in rows], get_prev_content=True
)
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ async def persist_events(
matched the transcation ID; the existing event is returned in such
a case.
"""
# logger.info(
# "persist_events backfilled=%s events_and_contexts=%s",
# backfilled,
# events_and_contexts,
# )
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
Expand Down

0 comments on commit 4983739

Please sign in to comment.