From f98c6910118d1140e4fb325ee11a411fae910cee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 17:22:17 +0100 Subject: [PATCH 01/13] Fix bug where the state deltas were incorrect --- synapse/state/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 9f0a36652c25..91c41d8ec4b9 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -769,6 +769,11 @@ def _make_state_cache_entry( delta_ids: Optional[StateMap[str]] = None for old_group, old_state in state_groups_ids.items(): + if old_state.keys() - new_state.keys(): + # Currently we don't support deltas that remove keys from the state + # map. + continue + n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v} if not delta_ids or len(n_delta_ids) < len(delta_ids): prev_group = old_group From efd5c3213af04a3811feb68ed84dc59f7365ff16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 17:27:20 +0100 Subject: [PATCH 02/13] Fix bug where we didn't calculate the state correctly for new events This was introduced in #13267 --- synapse/storage/controllers/persist_events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index af65e5913b4b..07c75e86868f 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -934,6 +934,8 @@ async def _get_new_state_after_events( state_res_store=StateResolutionStore(self.main_store), ) + full_state = await res.get_state(self._state_controller) + state_resolutions_during_persistence.inc() # If the returned state matches the state group of one of the new @@ -948,7 +950,7 @@ async def _get_new_state_after_events( events_context, ) - return res.state, None, new_latest_event_ids + return full_state, None, new_latest_event_ids async def _prune_extremities( self, From b92429fbe0432293155fa6b61f209f0a2fe175f8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 17:35:19 +0100 Subject: [PATCH 03/13] Add test --- tests/test_state.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/tests/test_state.py b/tests/test_state.py index 6ca8d8f21d66..b2f861a3f6df 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -21,7 +21,7 @@ from synapse.api.room_versions import RoomVersions from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext -from synapse.state import StateHandler, StateResolutionHandler +from synapse.state import StateHandler, StateResolutionHandler, _make_state_cache_entry from synapse.util import Clock from synapse.util.macaroons import MacaroonGenerator @@ -760,3 +760,32 @@ def _get_context( result = yield defer.ensureDeferred(self.state.compute_event_context(event)) return result + + def test_make_state_cache_entry(self): + "Test that calculating a prev_group and delta is correct" + + new_state = { + ("a", ""): "E", + ("b", ""): "E", + ("c", ""): "E", + } + + # Old state has fewer changes from new state, but the delta involves + # deleting a key, which isn't allowed in the deltas. + old_state_1 = { + ("a", ""): "F", + ("b", ""): "E", + ("c", ""): "E", + ("d", ""): "E", + } + + old_state_2 = { + ("a", ""): "F", + ("b", ""): "F", + ("c", ""): "F", + } + + entry = _make_state_cache_entry(new_state, {1: old_state_1, 2: old_state_2}) + + self.assertEqual(entry.prev_group, 2) + self.assertEqual(entry.delta_ids, new_state) From eb6e0aaac76e0b4629e51595580fd0824ce92d9f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 17:37:56 +0100 Subject: [PATCH 04/13] Newsfile --- changelog.d/13278.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13278.bugfix diff --git a/changelog.d/13278.bugfix b/changelog.d/13278.bugfix new file mode 100644 index 000000000000..7a797d425799 --- /dev/null +++ b/changelog.d/13278.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where in rare instance Synapse could store the incorrect state. From 055c2a73e4656f1acd9b8faa785ce6b824b29d43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 11:10:36 +0100 Subject: [PATCH 05/13] Don't pull out the full state group when storing state --- synapse/state/__init__.py | 32 ++--- synapse/storage/controllers/state.py | 2 +- synapse/storage/databases/state/store.py | 145 +++++++++++++++-------- 3 files changed, 112 insertions(+), 67 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 91c41d8ec4b9..0a563c4ff4ee 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -294,9 +294,12 @@ async def compute_event_context( # We make sure that we have a state group assigned to the state. if entry.state_group is None: - state_ids_before_event = await entry.get_state( - self._state_storage_controller, StateFilter.all() - ) + state_ids_before_event = None + if state_group_before_event_prev_group is None: + state_ids_before_event = await entry.get_state( + self._state_storage_controller, StateFilter.all() + ) + state_group_before_event = ( await self._state_storage_controller.store_state_group( event.event_id, @@ -329,19 +332,20 @@ async def compute_event_context( # # otherwise, we'll need to create a new state group for after the event # - if state_ids_before_event is None: - state_ids_before_event = await entry.get_state( - self._state_storage_controller, StateFilter.all() - ) key = (event.type, event.state_key) - if key in state_ids_before_event: - replaces = state_ids_before_event[key] - if replaces != event.event_id: - event.unsigned["replaces_state"] = replaces - state_ids_after_event = dict(state_ids_before_event) - state_ids_after_event[key] = event.event_id + if state_ids_before_event is not None: + replaces = state_ids_before_event.get(key) + else: + replaces_state_map = await entry.get_state( + self._state_storage_controller, StateFilter.from_types([key]) + ) + replaces = replaces_state_map.get(key) + + if replaces and replaces != event.event_id: + event.unsigned["replaces_state"] = replaces + delta_ids = {key: event.event_id} state_group_after_event = ( @@ -350,7 +354,7 @@ async def compute_event_context( event.room_id, prev_group=state_group_before_event, delta_ids=delta_ids, - current_state_ids=state_ids_after_event, + current_state_ids=None, ) ) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index d3a44bc876b9..e08f956e6ef7 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -346,7 +346,7 @@ async def store_state_group( room_id: str, prev_group: Optional[int], delta_ids: Optional[StateMap[str]], - current_state_ids: StateMap[str], + current_state_ids: Optional[StateMap[str]], ) -> int: """Store a new set of state, returning a newly assigned state group. diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 609a2b88bfbf..7132217510c7 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -400,7 +400,7 @@ async def store_state_group( room_id: str, prev_group: Optional[int], delta_ids: Optional[StateMap[str]], - current_state_ids: StateMap[str], + current_state_ids: Optional[StateMap[str]], ) -> int: """Store a new set of state, returning a newly assigned state group. @@ -418,10 +418,38 @@ async def store_state_group( The state group ID """ - def _store_state_group_txn(txn: LoggingTransaction) -> int: - if current_state_ids is None: - # AFAIK, this can never happen - raise Exception("current_state_ids cannot be None") + if prev_group is None and current_state_ids is None: + raise Exception("current_state_ids and prev_group can't both be None") + + if prev_group is not None and delta_ids is None: + raise Exception("delta_ids is None when prev_group is not None") + + def insert_delta_group_txn( + txn: LoggingTransaction, prev_group: int, delta_ids: StateMap[str] + ) -> Optional[int]: + """If we have a delta we try and persist the new group as a delta. + + Returns: + The state group if successfully created, or None if the state + needs to be persisted as a full state. + """ + is_in_db = self.db_pool.simple_select_one_onecol_txn( + txn, + table="state_groups", + keyvalues={"id": prev_group}, + retcol="id", + allow_none=True, + ) + if not is_in_db: + raise Exception( + "Trying to persist state with unpersisted prev_group: %r" + % (prev_group,) + ) + + potential_hops = self._count_state_group_hops_txn(txn, prev_group) + + if potential_hops >= MAX_STATE_DELTA_HOPS: + return None state_group = self._state_group_seq_gen.get_next_id_txn(txn) @@ -431,51 +459,45 @@ def _store_state_group_txn(txn: LoggingTransaction) -> int: values={"id": state_group, "room_id": room_id, "event_id": event_id}, ) - # We persist as a delta if we can, while also ensuring the chain - # of deltas isn't tooo long, as otherwise read performance degrades. - if prev_group: - is_in_db = self.db_pool.simple_select_one_onecol_txn( - txn, - table="state_groups", - keyvalues={"id": prev_group}, - retcol="id", - allow_none=True, - ) - if not is_in_db: - raise Exception( - "Trying to persist state with unpersisted prev_group: %r" - % (prev_group,) - ) - - potential_hops = self._count_state_group_hops_txn(txn, prev_group) - if prev_group and potential_hops < MAX_STATE_DELTA_HOPS: - assert delta_ids is not None - - self.db_pool.simple_insert_txn( - txn, - table="state_group_edges", - values={"state_group": state_group, "prev_state_group": prev_group}, - ) + self.db_pool.simple_insert_txn( + txn, + table="state_group_edges", + values={"state_group": state_group, "prev_state_group": prev_group}, + ) - self.db_pool.simple_insert_many_txn( - txn, - table="state_groups_state", - keys=("state_group", "room_id", "type", "state_key", "event_id"), - values=[ - (state_group, room_id, key[0], key[1], state_id) - for key, state_id in delta_ids.items() - ], - ) - else: - self.db_pool.simple_insert_many_txn( - txn, - table="state_groups_state", - keys=("state_group", "room_id", "type", "state_key", "event_id"), - values=[ - (state_group, room_id, key[0], key[1], state_id) - for key, state_id in current_state_ids.items() - ], - ) + self.db_pool.simple_insert_many_txn( + txn, + table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), + values=[ + (state_group, room_id, key[0], key[1], state_id) + for key, state_id in delta_ids.items() + ], + ) + + return state_group + + def insert_full_state_txn( + txn: LoggingTransaction, current_state_ids: StateMap[str] + ) -> int: + """Persist the full state, returning the new state group.""" + state_group = self._state_group_seq_gen.get_next_id_txn(txn) + + self.db_pool.simple_insert_txn( + txn, + table="state_groups", + values={"id": state_group, "room_id": room_id, "event_id": event_id}, + ) + + self.db_pool.simple_insert_many_txn( + txn, + table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), + values=[ + (state_group, room_id, key[0], key[1], state_id) + for key, state_id in current_state_ids.items() + ], + ) # Prefill the state group caches with this group. # It's fine to use the sequence like this as the state group map @@ -491,7 +513,7 @@ def _store_state_group_txn(txn: LoggingTransaction) -> int: self._state_group_members_cache.update, self._state_group_members_cache.sequence, key=state_group, - value=dict(current_member_state_ids), + value=current_member_state_ids, ) current_non_member_state_ids = { @@ -503,13 +525,32 @@ def _store_state_group_txn(txn: LoggingTransaction) -> int: self._state_group_cache.update, self._state_group_cache.sequence, key=state_group, - value=dict(current_non_member_state_ids), + value=current_non_member_state_ids, ) return state_group + if prev_group is not None: + state_group = await self.db_pool.runInteraction( + "store_state_group.insert_delta_group", + insert_delta_group_txn, + prev_group, + delta_ids, + ) + if state_group is not None: + return state_group + + if current_state_ids is None: + assert prev_group is not None + assert delta_ids is not None + groups = await self._get_state_for_groups([prev_group]) + current_state_ids = dict(groups[prev_group]) + current_state_ids.update(delta_ids) + return await self.db_pool.runInteraction( - "store_state_group", _store_state_group_txn + "store_state_group.insert_full_state", + insert_full_state_txn, + current_state_ids, ) async def purge_unreferenced_state_groups( From d761cb88bdd6fb0ade0283675d3375ac7828bc0e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 11:12:45 +0100 Subject: [PATCH 06/13] Newsfile --- changelog.d/13274.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13274.misc diff --git a/changelog.d/13274.misc b/changelog.d/13274.misc new file mode 100644 index 000000000000..a3344143203e --- /dev/null +++ b/changelog.d/13274.misc @@ -0,0 +1 @@ +Don't pull out state in `compute_event_context` for unconflicted state. From f4866ce531e16e64dac9f1e1828c045254680910 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2022 11:45:34 +0100 Subject: [PATCH 07/13] Fix test --- tests/rest/client/test_rooms.py | 4 ++-- tests/test_state.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 8ed5272b1667..06221b806ab8 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -709,7 +709,7 @@ def test_post_room_no_keys(self) -> None: self.assertEqual(200, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(32, channel.resource_usage.db_txn_count) + self.assertEqual(36, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -722,7 +722,7 @@ def test_post_room_initial_state(self) -> None: self.assertEqual(200, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(35, channel.resource_usage.db_txn_count) + self.assertEqual(40, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id diff --git a/tests/test_state.py b/tests/test_state.py index b2f861a3f6df..f71998342b2d 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -99,6 +99,10 @@ async def store_state_group( state_group = self._next_group self._next_group += 1 + if current_state_ids is None: + current_state_ids = dict(self._group_to_state[prev_group]) + current_state_ids.update(delta_ids) + self._group_to_state[state_group] = dict(current_state_ids) return state_group From 3be91b5cf1cc5a8fd3ddcf0e2f10c8d774565bf3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:06:25 +0100 Subject: [PATCH 08/13] Apply suggestions from code review Woo comments Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/state/__init__.py | 3 +++ synapse/storage/databases/state/store.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 0a563c4ff4ee..ef0281f02fce 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -295,6 +295,9 @@ async def compute_event_context( # We make sure that we have a state group assigned to the state. if entry.state_group is None: state_ids_before_event = None + # store_state_group requires us to have either a previous state group + # (with deltas) or the complete state map. So, if we don't have a + # previous state group, load the complete state map now. if state_group_before_event_prev_group is None: state_ids_before_event = await entry.get_state( self._state_storage_controller, StateFilter.all() diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 7132217510c7..884cc073cbf0 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -427,7 +427,9 @@ async def store_state_group( def insert_delta_group_txn( txn: LoggingTransaction, prev_group: int, delta_ids: StateMap[str] ) -> Optional[int]: - """If we have a delta we try and persist the new group as a delta. + """Try and persist the new group as a delta. + + Requires that we have the state as a delta from a previous state group. Returns: The state group if successfully created, or None if the state @@ -446,8 +448,9 @@ def insert_delta_group_txn( % (prev_group,) ) + # if the chain of state group deltas is going too long, we fall back to + # persisting a complete state group. potential_hops = self._count_state_group_hops_txn(txn, prev_group) - if potential_hops >= MAX_STATE_DELTA_HOPS: return None @@ -540,6 +543,9 @@ def insert_full_state_txn( if state_group is not None: return state_group + # We're going to persist the state as a complete group rather than + # a delta, so first we need to ensure we have loaded the state map + # from the database. if current_state_ids is None: assert prev_group is not None assert delta_ids is not None From 12fd634cc1dc8cceca0f2cd71a6cf1a1bfc674dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:05:43 +0100 Subject: [PATCH 09/13] Hoist --- synapse/state/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index ef0281f02fce..3dd24e9c09bd 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -293,8 +293,8 @@ async def compute_event_context( deltas_to_state_group_before_event = entry.delta_ids # We make sure that we have a state group assigned to the state. + state_ids_before_event = None if entry.state_group is None: - state_ids_before_event = None # store_state_group requires us to have either a previous state group # (with deltas) or the complete state map. So, if we don't have a # previous state group, load the complete state map now. @@ -315,7 +315,6 @@ async def compute_event_context( entry.state_group = state_group_before_event else: state_group_before_event = entry.state_group - state_ids_before_event = None # # now if it's not a state event, we're done From 6a850781d45899a9f8010c2bf2e00eb8048cb99c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:08:11 +0100 Subject: [PATCH 10/13] Comments --- synapse/storage/databases/state/store.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 884cc073cbf0..8a4ac804e142 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -404,6 +404,9 @@ async def store_state_group( ) -> int: """Store a new set of state, returning a newly assigned state group. + If `current_state_ids` is None then `prev_group` and `delta_ids` must + not be None. + Args: event_id: The event ID for which the state was calculated room_id @@ -428,7 +431,7 @@ def insert_delta_group_txn( txn: LoggingTransaction, prev_group: int, delta_ids: StateMap[str] ) -> Optional[int]: """Try and persist the new group as a delta. - + Requires that we have the state as a delta from a previous state group. Returns: @@ -543,9 +546,9 @@ def insert_full_state_txn( if state_group is not None: return state_group - # We're going to persist the state as a complete group rather than - # a delta, so first we need to ensure we have loaded the state map - # from the database. + # We're going to persist the state as a complete group rather than + # a delta, so first we need to ensure we have loaded the state map + # from the database. if current_state_ids is None: assert prev_group is not None assert delta_ids is not None From ff5bad6a50a02cc20ceff3f6bae580bdddb91640 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:41:23 +0100 Subject: [PATCH 11/13] Update synapse/storage/databases/state/store.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/state/store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 8a4ac804e142..cb4629022e0a 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -404,8 +404,8 @@ async def store_state_group( ) -> int: """Store a new set of state, returning a newly assigned state group. - If `current_state_ids` is None then `prev_group` and `delta_ids` must - not be None. + At least one of `current_state_ids` and `prev_group` must be provided. Whenever + `prev_group` is not None, `delta_ids` must also not be None. Args: event_id: The event ID for which the state was calculated From bc1adf39dc08426d445be46a406d89f535c7f58f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:41:57 +0100 Subject: [PATCH 12/13] Remove optional part --- synapse/storage/databases/state/store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index cb4629022e0a..afbc85ad0c3b 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -410,7 +410,7 @@ async def store_state_group( Args: event_id: The event ID for which the state was calculated room_id - prev_group: A previous state group for the room, optional. + prev_group: A previous state group for the room. delta_ids: The delta between state at `prev_group` and `current_state_ids`, if `prev_group` was given. Same format as `current_state_ids`. From 061ce1d4ab95ee7d1e687f2ce8de8849e9c34179 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2022 11:42:21 +0100 Subject: [PATCH 13/13] Before comment --- synapse/state/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 3dd24e9c09bd..57df12b68cd2 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -291,9 +291,9 @@ async def compute_event_context( state_group_before_event_prev_group = entry.prev_group deltas_to_state_group_before_event = entry.delta_ids + state_ids_before_event = None # We make sure that we have a state group assigned to the state. - state_ids_before_event = None if entry.state_group is None: # store_state_group requires us to have either a previous state group # (with deltas) or the complete state map. So, if we don't have a