Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Mark sync as limited if there is a gap in the timeline #16485

Merged
merged 7 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16485.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events.
21 changes: 19 additions & 2 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,17 @@ async def _load_filtered_recents(
else:
limited = False

# Check if there is a gap, if so we need to mark this as limited and
# recalculate which events to send down.
gap_token = await self.store.get_timeline_gaps(
room_id,
since_token.room_key if since_token else None,
now_token.room_key,
)
if gap_token:
potential_recents = []
limited = True

log_kv({"limited": limited})

if potential_recents:
Expand Down Expand Up @@ -599,7 +610,11 @@ async def _load_filtered_recents(
end_key = room_key

since_key = None
if since_token and not newly_joined_room:
if since_token and gap_token:
# If there is a gap then we need to only include events after
# it.
Comment on lines +632 to +633
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# If there is a gap then we need to only include events after
# it.
# If there is a gap then we need to include only events after
# it.

Maybe? I'm not sure why we would do this though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a section of the timeline with events A, B, <gap>, X, Y, Z we need to make sure we only send down X, Y, Z and not A and B (which we currently do. If we did, then the client wouldn't try and paginate to fill in the gap.

since_key = gap_token
elif since_token and not newly_joined_room:
since_key = since_token.room_key

while limited and len(recents) < timeline_limit and max_repeat:
Expand Down Expand Up @@ -684,7 +699,9 @@ async def _load_filtered_recents(
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
limited=limited or newly_joined_room,
# Also mark as limited if this is a new room or there has been a gap
# (to force client to paginate the gap).
limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
)

Expand Down
74 changes: 49 additions & 25 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2267,35 +2267,59 @@ def _update_backward_extremeties(

Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
# 1. Don't add an event as a extremity again if we already persisted it
# as a non-outlier.
# 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
" WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
" )"

room_id = events[0].room_id

potential_backwards_extremities = {
e_id
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
}

if not potential_backwards_extremities:
return

existing_events_outliers = self.db_pool.simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=potential_backwards_extremities,
keyvalues={"outlier": False},
retcols=("event_id",),
)

txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
],
potential_backwards_extremities.difference_update(
e for e, in existing_events_outliers
)

if potential_backwards_extremities:
self.db_pool.simple_upsert_many_txn(
txn,
table="event_backward_extremities",
key_names=("room_id", "event_id"),
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
value_names=(),
value_values=(),
)

# Record the stream orderings where we have new gaps.
gap_events = [
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
for ev in events
if any(
e_id in potential_backwards_extremities
for e_id in ev.prev_event_ids()
)
]

self.db_pool.simple_insert_many_txn(
txn,
table="timeline_gaps",
keys=("room_id", "instance_name", "stream_ordering"),
values=gap_events,
)

# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
Expand Down
45 changes: 45 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,3 +1616,48 @@ async def get_name_from_instance_id(self, instance_id: int) -> str:
retcol="instance_name",
desc="get_name_from_instance_id",
)

async def get_timeline_gaps(
self,
room_id: str,
from_token: Optional[RoomStreamToken],
to_token: RoomStreamToken,
) -> Optional[RoomStreamToken]:
"""Check if there is a gap, and return the token."""

sql = """
SELECT instance_name, stream_ordering
FROM timeline_gaps
WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
ORDER BY stream_ordering
"""

rows = await self.db_pool.execute(
"get_timeline_gaps",
None,
sql,
room_id,
from_token.stream if from_token else 0,
to_token.get_max_stream_pos(),
)

if not rows:
return None

positions = [
PersistedEventPosition(instance_name, stream_ordering)
for instance_name, stream_ordering in rows
]
if from_token:
positions = [p for p in positions if p.persisted_after(from_token)]

positions = [p for p in positions if not p.persisted_after(to_token)]

if positions:
# We return a stream token that ensures the event *at* the position
# of the gap is included (as the gap is *before* the persisted
# event).
last_position = positions[-1]
return RoomStreamToken(stream=last_position.stream - 1)

return None
25 changes: 25 additions & 0 deletions synapse/storage/schema/main/delta/82/05gaps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Records when we see a "gap in the timeline", due to missing events over
-- federation. We record this so that we can tell clients there is a gap (by
-- marking the timeline section of a sync request as limited).
CREATE TABLE IF NOT EXISTS timeline_gaps (
room_id TEXT NOT NULL,
instance_name TEXT NOT NULL,
stream_ordering BIGINT NOT NULL
);

CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);