diff --git a/changelog.d/17695.bugfix b/changelog.d/17695.bugfix new file mode 100644 index 00000000000..c63132704f8 --- /dev/null +++ b/changelog.d/17695.bugfix @@ -0,0 +1 @@ +Fix bug where room account data would not correctly be sent down sliding sync for old rooms. diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py index 287f4b04ada..56e1d9329ef 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py @@ -19,7 +19,6 @@ AbstractSet, ChainMap, Dict, - List, Mapping, MutableMapping, Optional, @@ -119,6 +118,8 @@ async def get_extensions_response( if sync_config.extensions.account_data is not None: account_data_response = await self.get_account_data_extension_response( sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, actual_lists=actual_lists, actual_room_ids=actual_room_ids, account_data_request=sync_config.extensions.account_data, @@ -361,6 +362,8 @@ async def get_e2ee_extension_response( async def get_account_data_extension_response( self, sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, @@ -425,15 +428,7 @@ async def get_account_data_extension_response( # Fetch room account data # - # List of -> Mapping from room_id to mapping of `type` to `content` of room - # account data events. - # - # This is is a list so we can avoid making copies of immutable data and instead - # just provide multiple maps that need to be combined. Normally, we could - # reach for `ChainMap` in this scenario, but this is a nested map and accessing - # the ChainMap by room_id won't combine the two maps for that room (we would - # need a new `NestedChainMap` type class). - account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = [] + account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {} relevant_room_ids = self.find_relevant_room_ids_for_extension( requested_lists=account_data_request.lists, requested_room_ids=account_data_request.rooms, @@ -441,9 +436,43 @@ async def get_account_data_extension_response( actual_room_ids=actual_room_ids, ) if len(relevant_room_ids) > 0: + # We need to handle the different cases depending on if we have sent + # down account data previously or not, so we split the relevant + # rooms up into different collections based on status. + live_rooms = set() + previously_rooms: Dict[str, int] = {} + initial_rooms = set() + + for room_id in relevant_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + room_status = previous_connection_state.account_data.have_sent_room( + room_id + ) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + # We fetch all room account data since the from_token. This is so + # that we can record which rooms have updates that haven't been sent + # down. + # + # Mapping from room_id to mapping of `type` to `content` of room account + # data events. + all_updates_since_the_from_token: Mapping[ + str, Mapping[str, JsonMapping] + ] = {} if from_token is not None: # TODO: This should take into account the `from_token` and `to_token` - account_data_by_room_map = ( + all_updates_since_the_from_token = ( await self.store.get_updated_room_account_data_for_user( user_id, from_token.stream_token.account_data_key ) @@ -456,58 +485,108 @@ async def get_account_data_extension_response( user_id, from_token.stream_token.account_data_key ) for room_id, tags in tags_by_room.items(): - account_data_by_room_map.setdefault(room_id, {})[ + all_updates_since_the_from_token.setdefault(room_id, {})[ AccountDataTypes.TAG ] = {"tags": tags} - account_data_by_room_maps.append(account_data_by_room_map) - else: - # TODO: This should take into account the `to_token` - immutable_account_data_by_room_map = ( - await self.store.get_room_account_data_for_user(user_id) - ) - account_data_by_room_maps.append(immutable_account_data_by_room_map) + # For live rooms we just get the updates from `all_updates_since_the_from_token` + if live_rooms: + for room_id in all_updates_since_the_from_token.keys() & live_rooms: + account_data_by_room_map[room_id] = ( + all_updates_since_the_from_token[room_id] + ) - # Add room tags - # - # TODO: This should take into account the `to_token` - tags_by_room = await self.store.get_tags_for_user(user_id) - account_data_by_room_maps.append( - { - room_id: {AccountDataTypes.TAG: {"tags": tags}} - for room_id, tags in tags_by_room.items() - } + # For previously and initial rooms we query each room individually. + if previously_rooms or initial_rooms: + + async def handle_previously(room_id: str) -> None: + # Either get updates or all account data in the room + # depending on if the room state is PREVIOUSLY or NEVER. + previous_token = previously_rooms.get(room_id) + if previous_token is not None: + room_account_data = await ( + self.store.get_updated_room_account_data_for_user_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + ) + + # Add room tags + changed = await self.store.has_tags_changed_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + if changed: + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + room_account_data[AccountDataTypes.TAG] = { + "tags": immutable_tag_map + } + + # Only add an entry if there were any updates. + if room_account_data: + account_data_by_room_map[room_id] = room_account_data + else: + # TODO: This should take into account the `to_token` + immutable_room_account_data = ( + await self.store.get_account_data_for_room(user_id, room_id) + ) + + # Add room tags + # + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + + account_data_by_room_map[room_id] = ChainMap( + {AccountDataTypes.TAG: {"tags": immutable_tag_map}} + if immutable_tag_map + else {}, + # Cast is safe because `ChainMap` only mutates the top-most map, + # see https://github.com/python/typeshed/issues/8430 + cast( + MutableMapping[str, JsonMapping], + immutable_room_account_data, + ), + ) + + # We handle these rooms concurrently to speed it up. + await concurrently_execute( + handle_previously, + previously_rooms.keys() | initial_rooms, + limit=20, ) - # Filter down to the relevant rooms ... and combine the maps - relevant_account_data_by_room_map: MutableMapping[ - str, Mapping[str, JsonMapping] - ] = {} - for room_id in relevant_room_ids: - # We want to avoid adding empty maps for relevant rooms that have no room - # account data so do a quick check to see if it's in any of the maps. - is_room_in_maps = False - for room_map in account_data_by_room_maps: - if room_id in room_map: - is_room_in_maps = True - break + # Now record which rooms are now up to data, and which rooms have + # pending updates to send. + new_connection_state.account_data.record_sent_rooms(relevant_room_ids) + missing_updates = ( + all_updates_since_the_from_token.keys() - relevant_room_ids + ) + if missing_updates: + # If we have missing updates then we must have had a from_token. + assert from_token is not None - # If we found the room in any of the maps, combine the maps for that room - if is_room_in_maps: - relevant_account_data_by_room_map[room_id] = ChainMap( - {}, - *( - # Cast is safe because `ChainMap` only mutates the top-most map, - # see https://github.com/python/typeshed/issues/8430 - cast(MutableMapping[str, JsonMapping], room_map[room_id]) - for room_map in account_data_by_room_maps - if room_map.get(room_id) - ), + new_connection_state.account_data.record_unsent_rooms( + missing_updates, from_token.stream_token.account_data_key ) return SlidingSyncResult.Extensions.AccountDataExtension( global_account_data_map=global_account_data_map, - account_data_by_room_map=relevant_account_data_by_room_map, + account_data_by_room_map=account_data_by_room_map, ) @trace diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index b30639b4e6b..e583c182bad 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -467,6 +467,56 @@ def get_updated_room_account_data_for_user_txn( get_updated_room_account_data_for_user_txn, ) + async def get_updated_room_account_data_for_user_for_room( + self, + # Since there are multiple arguments with the same type, force keyword arguments + # so people don't accidentally swap the order + *, + user_id: str, + room_id: str, + from_stream_id: int, + to_stream_id: int, + ) -> Dict[str, JsonMapping]: + """Get the room account_data that's changed for a user in a room. + + (> `from_stream_id` and <= `to_stream_id`) + + Args: + user_id: The user to get the account_data for. + room_id: The room to check + from_stream_id: The point in the stream to fetch from + to_stream_id: The point in the stream to fetch to + + Returns: + A dict of the room account data. + """ + + def get_updated_room_account_data_for_user_for_room_txn( + txn: LoggingTransaction, + ) -> Dict[str, JsonMapping]: + sql = """ + SELECT account_data_type, content FROM room_account_data + WHERE user_id = ? AND room_id = ? AND stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, (user_id, room_id, from_stream_id, to_stream_id)) + + room_account_data: Dict[str, JsonMapping] = {} + for row in txn: + room_account_data[row[0]] = db_to_json(row[1]) + + return room_account_data + + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(from_stream_id) + ) + if not changed: + return {} + + return await self.db_pool.runInteraction( + "get_updated_room_account_data_for_user_for_room", + get_updated_room_account_data_for_user_for_room_txn, + ) + @cached(max_entries=5000, iterable=True) async def ignored_by(self, user_id: str) -> FrozenSet[str]: """ diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 83939d10b0a..f2df37fec15 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -267,6 +267,15 @@ def persist_per_connection_state_txn( (have_sent_room.status.value, have_sent_room.last_token) ) + for ( + room_id, + have_sent_room, + ) in per_connection_state.account_data._statuses.items(): + key_values.append((connection_position, "account_data", room_id)) + value_values.append( + (have_sent_room.status.value, have_sent_room.last_token) + ) + self.db_pool.simple_upsert_many_txn( txn, table="sliding_sync_connection_streams", @@ -407,6 +416,7 @@ def _get_and_clear_connection_positions_txn( # Now look up the per-room stream data. rooms: Dict[str, HaveSentRoom[str]] = {} receipts: Dict[str, HaveSentRoom[str]] = {} + account_data: Dict[str, HaveSentRoom[str]] = {} receipt_rows = self.db_pool.simple_select_list_txn( txn, @@ -427,6 +437,8 @@ def _get_and_clear_connection_positions_txn( rooms[room_id] = have_sent_room elif stream == "receipts": receipts[room_id] = have_sent_room + elif stream == "account_data": + account_data[room_id] = have_sent_room else: # For forwards compatibility we ignore unknown streams, as in # future we want to be able to easily add more stream types. @@ -435,6 +447,7 @@ def _get_and_clear_connection_positions_txn( return PerConnectionStateDB( rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), + account_data=RoomStatusMap(account_data), room_configs=room_configs, ) @@ -452,6 +465,7 @@ class PerConnectionStateDB: rooms: "RoomStatusMap[str]" receipts: "RoomStatusMap[str]" + account_data: "RoomStatusMap[str]" room_configs: Mapping[str, "RoomSyncConfig"] @@ -484,10 +498,21 @@ async def from_state( for room_id, status in per_connection_state.receipts.get_updates().items() } + account_data = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + str(status.last_token) if status.last_token is not None else None + ), + ) + for room_id, status in per_connection_state.account_data.get_updates().items() + } + log_kv( { "rooms": rooms, "receipts": receipts, + "account_data": account_data, "room_configs": per_connection_state.room_configs.maps[0], } ) @@ -495,6 +520,7 @@ async def from_state( return PerConnectionStateDB( rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), + account_data=RoomStatusMap(account_data), room_configs=per_connection_state.room_configs.maps[0], ) @@ -524,8 +550,19 @@ async def to_state(self, store: "DataStore") -> "PerConnectionState": for room_id, status in self.receipts._statuses.items() } + account_data = { + room_id: HaveSentRoom( + status=status.status, + last_token=( + int(status.last_token) if status.last_token is not None else None + ), + ) + for room_id, status in self.account_data._statuses.items() + } + return PerConnectionState( rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), + account_data=RoomStatusMap(account_data), room_configs=self.room_configs, ) diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index b498cb96256..44f395f3155 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -158,6 +158,52 @@ def get_updated_tags_txn(txn: LoggingTransaction) -> List[str]: return results + async def has_tags_changed_for_room( + self, + # Since there are multiple arguments with the same type, force keyword arguments + # so people don't accidentally swap the order + *, + user_id: str, + room_id: str, + from_stream_id: int, + to_stream_id: int, + ) -> bool: + """Check if the users tags for a room have been updated in the token range + + (> `from_stream_id` and <= `to_stream_id`) + + Args: + user_id: The user to get tags for + room_id: The room to get tags for + from_stream_id: The point in the stream to fetch from + to_stream_id: The point in the stream to fetch to + + Returns: + A mapping of tags to tag content. + """ + + # Shortcut if no room has changed for the user + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(from_stream_id) + ) + if not changed: + return False + + last_change_position_for_room = await self.db_pool.simple_select_one_onecol( + table="room_tags_revisions", + keyvalues={"user_id": user_id, "room_id": room_id}, + retcol="stream_id", + allow_none=True, + ) + + if last_change_position_for_room is None: + return False + + return ( + last_change_position_for_room > from_stream_id + and last_change_position_for_room <= to_stream_id + ) + @cached(num_args=2, tree=True) async def get_tags_for_room( self, user_id: str, room_id: str diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 149920f8834..5dd2c9d411e 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -675,7 +675,7 @@ class HaveSentRoomFlag(Enum): LIVE = "live" -T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken) +T = TypeVar("T", str, RoomStreamToken, MultiWriterStreamToken, int) @attr.s(auto_attribs=True, slots=True, frozen=True) @@ -823,6 +823,7 @@ class PerConnectionState: rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) + account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap) room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) @@ -833,6 +834,7 @@ def get_mutable(self) -> "MutablePerConnectionState": return MutablePerConnectionState( rooms=self.rooms.get_mutable(), receipts=self.receipts.get_mutable(), + account_data=self.account_data.get_mutable(), room_configs=ChainMap({}, room_configs), ) @@ -840,6 +842,7 @@ def copy(self) -> "PerConnectionState": return PerConnectionState( rooms=self.rooms.copy(), receipts=self.receipts.copy(), + account_data=self.account_data.copy(), room_configs=dict(self.room_configs), ) @@ -853,6 +856,7 @@ class MutablePerConnectionState(PerConnectionState): rooms: MutableRoomStatusMap[RoomStreamToken] receipts: MutableRoomStatusMap[MultiWriterStreamToken] + account_data: MutableRoomStatusMap[int] room_configs: typing.ChainMap[str, RoomSyncConfig] @@ -860,6 +864,7 @@ def has_updates(self) -> bool: return ( bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) + or bool(self.account_data.get_updates()) or bool(self.get_room_config_updates()) ) diff --git a/tests/rest/client/sliding_sync/test_extension_account_data.py b/tests/rest/client/sliding_sync/test_extension_account_data.py index 03b2db39b91..799fbb18564 100644 --- a/tests/rest/client/sliding_sync/test_extension_account_data.py +++ b/tests/rest/client/sliding_sync/test_extension_account_data.py @@ -11,9 +11,11 @@ # See the GNU Affero General Public License for more details: # . # +import enum import logging -from parameterized import parameterized_class +from parameterized import parameterized, parameterized_class +from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +32,11 @@ logger = logging.getLogger(__name__) +class TagAction(enum.Enum): + ADD = enum.auto() + REMOVE = enum.auto() + + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the # foreground update for # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by @@ -350,10 +357,20 @@ def test_room_account_data_initial_sync(self) -> None: account_data_map[AccountDataTypes.TAG], {"tags": {"m.favourite": {}}} ) - def test_room_account_data_incremental_sync(self) -> None: + @parameterized.expand( + [ + ("add tags", TagAction.ADD), + ("remove tags", TagAction.REMOVE), + ] + ) + def test_room_account_data_incremental_sync( + self, test_description: str, tag_action: TagAction + ) -> None: """ On incremental sync, we return all account data for a given room but only for rooms that we request and are being returned in the Sliding Sync response. + + (HaveSentRoomFlag.LIVE) """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -432,42 +449,472 @@ def test_room_account_data_incremental_sync(self) -> None: content={"roo": "rar"}, ) ) - # Add another room tag + if tag_action == TagAction.ADD: + # Add another room tag + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id1, + tag="m.server_notice", + content={}, + ) + ) + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id2, + tag="m.server_notice", + content={}, + ) + ) + elif tag_action == TagAction.REMOVE: + # Remove the room tag + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id1, + tag="m.favourite", + ) + ) + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id2, + tag="m.favourite", + ) + ) + else: + assert_never(tag_action) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + # We should only see the new room account data that happened after the `from_token` + account_data_map = { + event["type"]: event["content"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id1) + } + self.assertIncludes( + account_data_map.keys(), + {"org.matrix.roorarraz2", AccountDataTypes.TAG}, + exact=True, + ) + self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"}) + if tag_action == TagAction.ADD: + self.assertEqual( + account_data_map[AccountDataTypes.TAG], + {"tags": {"m.favourite": {}, "m.server_notice": {}}}, + ) + elif tag_action == TagAction.REMOVE: + # If we previously showed the client that the room has tags, when it no + # longer has tags, we need to show them an empty map. + self.assertEqual( + account_data_map[AccountDataTypes.TAG], + {"tags": {}}, + ) + else: + assert_never(tag_action) + + @parameterized.expand( + [ + ("add tags", TagAction.ADD), + ("remove tags", TagAction.REMOVE), + ] + ) + def test_room_account_data_incremental_sync_out_of_range_never( + self, test_description: str, tag_action: TagAction + ) -> None: + """Tests that we don't return account data for rooms that are out of + range, but then do send all account data once they're in range. + + (initial/HaveSentRoomFlag.NEVER) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + # Add a room tag to mark the room as a favourite self.get_success( self.account_data_handler.add_tag_to_room( user_id=user1_id, room_id=room_id1, - tag="m.server_notice", + tag="m.favourite", content={}, ) ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + # Add a room tag to mark the room as a favourite self.get_success( self.account_data_handler.add_tag_to_room( user_id=user1_id, room_id=room_id2, - tag="m.server_notice", + tag="m.favourite", content={}, ) ) + # Now send a message into room1 so that it is at the top of the list + self.helper.send(room_id1, body="new event", tok=user1_tok) + + # Make a SS request for only the top room. + sync_body = { + "lists": { + "main": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "lists": ["main"], + } + }, + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Only room1 should be in the response since it's the latest room with activity + # and our range only includes 1 room. + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + + # Add some other room account data + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + if tag_action == TagAction.ADD: + # Add another room tag + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id1, + tag="m.server_notice", + content={}, + ) + ) + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id2, + tag="m.server_notice", + content={}, + ) + ) + elif tag_action == TagAction.REMOVE: + # Remove the room tag + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id1, + tag="m.favourite", + ) + ) + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id2, + tag="m.favourite", + ) + ) + else: + assert_never(tag_action) + + # Move room2 into range. + self.helper.send(room_id2, body="new event", tok=user1_tok) + # Make an incremental Sliding Sync request with the account_data extension enabled response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) - # Even though we requested room2, we only expect room1 to show up because that's - # the only room in the Sliding Sync response (room2 is not one of our room - # subscriptions or in a sliding window list). + # We expect to see the account data of room2, as that has the most + # recent update. + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + # Since this is the first time we're seeing room2 down sync, we should see all + # room account data for it. + account_data_map = { + event["type"]: event["content"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id2) + } + expected_account_data_keys = { + "org.matrix.roorarraz", + "org.matrix.roorarraz2", + } + if tag_action == TagAction.ADD: + expected_account_data_keys.add(AccountDataTypes.TAG) + self.assertIncludes( + account_data_map.keys(), + expected_account_data_keys, + exact=True, + ) + self.assertEqual(account_data_map["org.matrix.roorarraz"], {"roo": "rar"}) + self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"}) + if tag_action == TagAction.ADD: + self.assertEqual( + account_data_map[AccountDataTypes.TAG], + {"tags": {"m.favourite": {}, "m.server_notice": {}}}, + ) + elif tag_action == TagAction.REMOVE: + # Since we never told the client about the room tags, we don't need to say + # anything if there are no tags now (the client doesn't need an update). + self.assertIsNone( + account_data_map.get(AccountDataTypes.TAG), + account_data_map, + ) + else: + assert_never(tag_action) + + @parameterized.expand( + [ + ("add tags", TagAction.ADD), + ("remove tags", TagAction.REMOVE), + ] + ) + def test_room_account_data_incremental_sync_out_of_range_previously( + self, test_description: str, tag_action: TagAction + ) -> None: + """Tests that we don't return account data for rooms that fall out of + range, but then do send all account data that has changed they're back in range. + + (HaveSentRoomFlag.PREVIOUSLY) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + # Add a room tag to mark the room as a favourite + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id1, + tag="m.favourite", + content={}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + # Add a room tag to mark the room as a favourite + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id2, + tag="m.favourite", + content={}, + ) + ) + + # Make an initial Sliding Sync request for only room1 and room2. + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + }, + room_id2: { + "required_state": [], + "timeline_limit": 0, + }, + }, + "extensions": { + "account_data": { + "enabled": True, + "rooms": [room_id1, room_id2], + } + }, + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Both rooms show up because we have a room subscription for each and they're + # requested in the `account_data` extension. + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1, room_id2}, + exact=True, + ) + + # Add some other room account data + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + if tag_action == TagAction.ADD: + # Add another room tag + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id1, + tag="m.server_notice", + content={}, + ) + ) + self.get_success( + self.account_data_handler.add_tag_to_room( + user_id=user1_id, + room_id=room_id2, + tag="m.server_notice", + content={}, + ) + ) + elif tag_action == TagAction.REMOVE: + # Remove the room tag + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id1, + tag="m.favourite", + ) + ) + self.get_success( + self.account_data_handler.remove_tag_from_room( + user_id=user1_id, + room_id=room_id2, + tag="m.favourite", + ) + ) + else: + assert_never(tag_action) + + # Make an incremental Sliding Sync request for just room1 + response_body, from_token = self.do_sync( + { + **sync_body, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + }, + }, + }, + since=from_token, + tok=user1_tok, + ) + + # Only room1 shows up because we only have a room subscription for room1 now. self.assertIncludes( response_body["extensions"]["account_data"].get("rooms").keys(), {room_id1}, exact=True, ) - # We should only see the new room account data that happened after the `from_token` + + # Make an incremental Sliding Sync request for just room2 now + response_body, from_token = self.do_sync( + { + **sync_body, + "room_subscriptions": { + room_id2: { + "required_state": [], + "timeline_limit": 0, + }, + }, + }, + since=from_token, + tok=user1_tok, + ) + + # Only room2 shows up because we only have a room subscription for room2 now. + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Check for room account data for room2 + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + # We should see any room account data updates for room2 since the last + # time we saw it down sync account_data_map = { event["type"]: event["content"] for event in response_body["extensions"]["account_data"] .get("rooms") - .get(room_id1) + .get(room_id2) } self.assertIncludes( account_data_map.keys(), @@ -475,10 +922,20 @@ def test_room_account_data_incremental_sync(self) -> None: exact=True, ) self.assertEqual(account_data_map["org.matrix.roorarraz2"], {"roo": "rar"}) - self.assertEqual( - account_data_map[AccountDataTypes.TAG], - {"tags": {"m.favourite": {}, "m.server_notice": {}}}, - ) + if tag_action == TagAction.ADD: + self.assertEqual( + account_data_map[AccountDataTypes.TAG], + {"tags": {"m.favourite": {}, "m.server_notice": {}}}, + ) + elif tag_action == TagAction.REMOVE: + # If we previously showed the client that the room has tags, when it no + # longer has tags, we need to show them an empty map. + self.assertEqual( + account_data_map[AccountDataTypes.TAG], + {"tags": {}}, + ) + else: + assert_never(tag_action) def test_wait_for_new_data(self) -> None: """