Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: omit partial rooms from eager syncs until the resync completes #14870

Merged
merged 22 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14870.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated.
11 changes: 6 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,15 +1726,16 @@ async def _sync_partial_state_room(
await self._device_handler.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
new_stream_id = await self.store.clear_partial_state_room(room_id)
if new_stream_id is not None:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

await self._notifier.on_un_partial_stated_room(
room_id, new_stream_id
)

# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
Expand Down
65 changes: 55 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def __init__(self, hs: "HomeServer"):
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)

self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync

async def wait_for_sync_for_user(
self,
Expand Down Expand Up @@ -1340,7 +1340,10 @@ async def generate_sync_result(
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
user_id,
since_token.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
Expand Down Expand Up @@ -1375,12 +1378,39 @@ async def generate_sync_result(
else:
mutable_joined_room_ids.discard(room_id)

# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
if not sync_config.filter_collection.lazy_load_members():
# Non-lazy syncs should never include partially stated rooms.
# Exclude all partially stated rooms from this sync.
for room_id in mutable_joined_room_ids:
if await self.store.is_partial_state_room(room_id):
mutable_rooms_to_exclude.add(room_id)

# Incremental eager syncs should additionally include rooms that
# - we are joined to
# - are full-stated
# - became fully-stated at some point during the sync period
# (These rooms will have been omitted during a previous eager sync.)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
forced_newly_joined_room_ids = set()
if since_token and not sync_config.filter_collection.lazy_load_members():
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
since_token.un_partial_stated_rooms_key,
now_token.un_partial_stated_rooms_key,
mutable_joined_room_ids,
)
)
for room_id in un_partial_stated_rooms:
if not await self.store.is_partial_state_room(room_id):
forced_newly_joined_room_ids.add(room_id)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
(
room_id
for room_id in mutable_joined_room_ids
if room_id not in self.rooms_to_exclude
if room_id not in mutable_rooms_to_exclude
)
)

Expand All @@ -1397,6 +1427,8 @@ async def generate_sync_result(
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
)

Expand Down Expand Up @@ -1834,14 +1866,16 @@ async def _generate_sync_entry_for_rooms(
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
room_changes = await self._get_room_changes_for_initial_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_tags_for_user(user_id)

log_kv({"rooms_changed": len(room_changes.room_entries)})
Expand Down Expand Up @@ -1900,7 +1934,7 @@ async def _have_rooms_changed(

assert since_token

if membership_change_events:
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True

stream_id = since_token.room_key.stream
Expand All @@ -1909,7 +1943,7 @@ async def _have_rooms_changed(
return True
return False

async def _get_rooms_changed(
async def _get_room_changes_for_incremental_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
Expand Down Expand Up @@ -1947,7 +1981,9 @@ async def _get_rooms_changed(
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)

newly_joined_rooms: List[str] = []
newly_joined_rooms: List[str] = list(
sync_result_builder.forced_newly_joined_room_ids
)
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
Expand Down Expand Up @@ -2153,7 +2189,7 @@ async def _get_rooms_changed(
newly_left_rooms,
)

async def _get_all_rooms(
async def _get_room_changes_for_initial_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
Expand All @@ -2178,7 +2214,7 @@ async def _get_all_rooms(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude,
excluded_rooms=sync_result_builder.excluded_room_ids,
)

room_entries = []
Expand Down Expand Up @@ -2549,6 +2585,13 @@ class SyncResultBuilder:
since_token: The token supplied by user, or None.
now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to
excluded_room_ids: Set of room ids we should omit from the /sync response.
forced_newly_jined_room_ids:
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
Rooms that should be presented in the /sync response as if they were
newly joined during the sync period, even if that's not the case.
(This is useful if the room was previously excluded from a /sync response,
and now the client should be made aware of it.)
Only used by incremental syncs.

# The following mirror the fields in a sync response
presence
Expand All @@ -2565,6 +2608,8 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
excluded_room_ids: FrozenSet[str]
forced_newly_joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]

presence: List[UserPresenceState] = attr.Factory(list)
Expand Down
26 changes: 26 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,32 @@ async def on_new_room_events(
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)

async def on_un_partial_stated_room(
self,
room_id: str,
new_token: int,
) -> None:
"""Used by the resync background processes to wake up all listeners
of this room that it just got un-partial-stated.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

It will also notify replication listeners of the change in stream.
"""

# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception:
logger.exception("Failed to notify listener")

# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

async def notify_new_room_events(
self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_stated_rooms_key=0,
)

return events[:limit], next_token
Expand Down
47 changes: 41 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -1285,10 +1286,44 @@ def get_un_partial_stated_rooms_token(self) -> int:
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.

Returns:
The list of room ids.
"""

if last_id == current_id:
return set()

def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

txn.execute(sql + clause, [last_id, current_id] + list(args))
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

return {r[0] for r in txn}

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
"""Get updates for un partial stated rooms replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
Expand Down Expand Up @@ -2295,16 +2330,16 @@ async def unblock_room(self, room_id: str) -> None:
(room_id,),
)

async def clear_partial_state_room(self, room_id: str) -> bool:
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room.

Args:
room_id: The room whose partial state flag is to be cleared.

Returns:
`True` if the partial state flag has been cleared successfully.
The corresponding stream id for the un-partial-stated rooms stream.

`False` if the partial state flag could not be cleared because the room
`None` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
Expand All @@ -2315,15 +2350,15 @@ async def clear_partial_state_room(self, room_id: str) -> bool:
room_id,
un_partial_state_room_stream_id,
)
return True
return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False
return None

def _clear_partial_state_room_txn(
self,
Expand Down
19 changes: 14 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import (
TYPE_CHECKING,
AbstractSet,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -47,7 +48,13 @@
ProfileInfo,
RoomsForUser,
)
from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id
from synapse.types import (
JsonDict,
PersistedEventPosition,
StateMap,
StrCollection,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
Expand Down Expand Up @@ -385,7 +392,7 @@ async def get_rooms_for_local_user_where_membership_is(
self,
user_id: str,
membership_list: Collection[str],
excluded_rooms: Optional[List[str]] = None,
excluded_rooms: StrCollection = (),
) -> List[RoomsForUser]:
"""Get all the rooms for this *local* user where the membership for this user
matches one in the membership list.
Expand All @@ -412,10 +419,12 @@ async def get_rooms_for_local_user_where_membership_is(
)

# Now we filter out forgotten and excluded rooms
rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id)
rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)

if excluded_rooms is not None:
rooms_to_exclude.update(set(excluded_rooms))
# Take a copy to avoid mutating the in-cache set
rooms_to_exclude = set(rooms_to_exclude)
rooms_to_exclude.update(excluded_rooms)

return [room for room in rooms if room.room_id not in rooms_to_exclude]

Expand Down Expand Up @@ -1169,7 +1178,7 @@ def f(txn: LoggingTransaction) -> int:
return count == 0

@cached()
async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]:
async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
"""Gets all rooms the user has forgotten.

Args:
Expand Down
3 changes: 3 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token()

token = StreamToken(
room_key=self.sources.room.get_current_key(),
Expand All @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
device_list_key=device_list_key,
# Groups key is unused.
groups_key=0,
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
)
return token

Expand Down Expand Up @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_stated_rooms_key=0,
)
return token
Loading