diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 5e068cee097a..80b314ec148e 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -441,7 +441,37 @@ async def backfill( f"room {ev.room_id}, when we were backfilling in {room_id}" ) - await self._process_pulled_events(dest, events, backfilled=True) + await self._process_pulled_events( + dest, + # The /backfill response should start from `?v` and include the + # events that preceded it (so the list will be newest -> oldest). We + # reverse that order so the messages are oldest -> newest and we can + # persist the backfilled events without constantly have to go fetch + # missing prev_events which are probably included in the same + # backfill chunk. + reversed(events), + backfilled=True, + ) + + for ev in events: + event_after_persisted = await self._store.get_event( + ev.event_id, allow_none=True + ) + + if event_after_persisted: + logger.info( + "from remote server: processed backfilled event_id=%s type=%s depth=%s stream_ordering=%s content=%s", + ev.event_id, + event_after_persisted["type"], + event_after_persisted["depth"], + event_after_persisted.internal_metadata.stream_ordering, + event_after_persisted["content"].get("body", None), + ) + else: + logger.info( + "from remote server: processed backfilled event_id=%s failed to lookup", + ev.event_id, + ) async def _get_missing_events_for_pdu( self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index ed95189b6d8b..43cfb46c068b 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -561,6 +561,7 @@ async def on_GET( pagination_config = await PaginationConfig.from_request( self.store, request, default_limit=10 ) + logger.info("/messages rest start pagination_config=%s", pagination_config) # Twisted will have processed the args by now. assert request.args is not None as_client_event = b"raw" not in request.args @@ -585,6 +586,7 @@ async def on_GET( event_filter=event_filter, ) + logger.info("/messages rest end msgs=%s", msgs) return 200, msgs diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b9c48eea5631..3d20bb884566 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1141,13 +1141,14 @@ def _get_backfill_events(self, txn, room_id, event_list, limit): connected_insertion_event_depth = row[0] connected_insertion_event_stream_ordering = row[1] connected_insertion_event = row[2] - queue.put( - ( - -connected_insertion_event_depth, - -connected_insertion_event_stream_ordering, - connected_insertion_event, + if connected_insertion_event not in event_results: + queue.put( + ( + -connected_insertion_event_depth, + -connected_insertion_event_stream_ordering, + connected_insertion_event, + ) ) - ) # Find any batch connections for the given insertion event txn.execute( @@ -1169,9 +1170,6 @@ def _get_backfill_events(self, txn, room_id, event_list, limit): "_get_backfill_events: prev_event_ids %s", prev_event_id_results ) - # TODO: Find out why stream_ordering is all out of order compared to - # when we persisted the events - # TODO: We should probably skip adding the event itself if we # branched off onto the insertion event first above. Need to make this a # bit smart so it doesn't skip over the event altogether if we're at diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 37439f85628e..56d265213290 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -169,6 +169,14 @@ async def _persist_events_and_state_updates( async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): + logger.info( + "_persist_events_and_state_updates backfilled=%s event_id=%s depth=%s stream_ordering=%s content=%s", + backfilled, + event.event_id, + event.depth, + stream, + event["content"].get("body", None), + ) event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dc7884b1c0c3..d15c38d4b783 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1166,6 +1166,7 @@ def _paginate_room_events_txn( "order": order, } + logger.info("stream: getting events sql=%s args=%s", sql, args) txn.execute(sql, args) # Filter the result set. @@ -1236,6 +1237,7 @@ async def paginate_room_events( event_filter, ) + logger.info("paginate_room_events event_ids(%d)=%s", len(rows), [r.event_id for r in rows]) events = await self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 0e8270746d78..58d1c08906b5 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -309,6 +309,11 @@ async def persist_events( matched the transcation ID; the existing event is returned in such a case. """ + # logger.info( + # "persist_events backfilled=%s events_and_contexts=%s", + # backfilled, + # events_and_contexts, + # ) partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx))