Skip to content

Commit

Permalink
/sync: fix bug in calculating state response
Browse files Browse the repository at this point in the history
Fix an issue which could cause state to be omitted from the sync response if
the last event was filtered out.

Fixes: #16928
  • Loading branch information
richvdh committed Feb 17, 2024
1 parent 050b7bf commit 37e775d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 41 deletions.
54 changes: 13 additions & 41 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,15 +1177,14 @@ async def _compute_state_delta_for_full_sync(
await_full_state = True
lazy_load_members = False

if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
Expand All @@ -1194,13 +1193,6 @@ async def _compute_state_delta_for_full_sync(
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_start = state_at_timeline_end

state_ids = _calculate_state(
Expand Down Expand Up @@ -1295,23 +1287,12 @@ async def _compute_state_delta_for_incremental_sync(
await_full_state=await_full_state,
)

if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
Expand Down Expand Up @@ -2837,15 +2818,6 @@ def _calculate_state(
# timeline. We have no way to represent that in the /sync response, and we don't
# even try; it is ether omitted or plonked into `state` as if it were at the start
# of the timeline, depending on what else is in the timeline.)
#
# ----------
#
# Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually
# the state *before* the final event in the timeline. In other words: if the final
# event in the timeline is a state event, it won't be included in `timeline_end`.
# However, that doesn't matter here, because the only difference can be in that
# one piece of state, and by definition that event is in the timeline, so we
# don't need to include it in the `state` section.

state_ids = (
(timeline_end_ids | timeline_start_ids)
Expand Down
80 changes: 80 additions & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,86 @@ def test_state_includes_changes_on_forks(self) -> None:
[s2_event],
)

def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
"""A variation on the previous test, but where one event is filtered
The DAG is the same as the previous test, but E4 is excluded by the filter.
E1
↗ ↖
| S2
| ↑
--|------|----
| |
E3 |
↖ /
(E4)
"""

alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)

# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)

# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]

# Send a final event, joining the two branches of the dag
self.helper.send(room_id, "e4", type="not_a_normal_message", tok=alice_tok)[
"event_id"
]

# do an incremental sync, with a filter that will only return E3, excluding S2
# and E4.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs,
{
"room": {
"timeline": {
"limit": 1,
"not_types": ["not_a_normal_message"],
}
}
},
),
),
since_token=initial_sync_result.next_batch,
)
)

# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)

@parameterized.expand(
[
(False, False),
Expand Down

0 comments on commit 37e775d

Please sign in to comment.