Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Fix historical messages backfilling in random order on remote homeser…
Browse files Browse the repository at this point in the history
…vers (MSC2716) (#11114)

Fix matrix-org/synapse#11091
Fix matrix-org/synapse#10764 (side-stepping the issue because we no longer have to deal with `fake_prev_event_id`)

 1. Made the `/backfill` response return messages in `(depth, stream_ordering)` order (previously only sorted by `depth`)
    - Technically, it shouldn't really matter how `/backfill` returns things but I'm just trying to make the `stream_ordering` a little more consistent from the origin to the remote homeservers in order to get the order of messages from `/messages` consistent ([sorted by `(topological_ordering, stream_ordering)`](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)).
    - Even now that we return backfilled messages in order, it still doesn't guarantee the same `stream_ordering` (and more importantly the [`/messages` order](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)) on the other server. For example, if a room has a bunch of history imported and someone visits a permalink to a historical message back in time, their homeserver will skip over the historical messages in between and insert the permalink as the next message in the `stream_order` and totally throw off the sort.
       - This will be even more the case when we add the [MSC3030 jump to date API endpoint](matrix-org/matrix-spec-proposals#3030) so the static archives can navigate and jump to a certain date.
       - We're solving this in the future by switching to [online topological ordering](matrix-org/gomatrixserverlib#187) and [chunking](matrix-org/synapse#3785) which by its nature will apply retroactively to fix any inconsistencies introduced by people permalinking
 2. As we're navigating `prev_events` to return in `/backfill`, we order by `depth` first (newest -> oldest) and now also tie-break based on the `stream_ordering` (newest -> oldest). This is technically important because MSC2716 inserts a bunch of historical messages at the same `depth` so it's best to be prescriptive about which ones we should process first. In reality, I think the code already looped over the historical messages as expected because the database is already in order.
 3. Making the historical state chain and historical event chain float on their own by having no `prev_events` instead of a fake `prev_event` which caused backfill to get clogged with an unresolvable event. Fixes matrix-org/synapse#11091 and matrix-org/synapse#10764
 4. We no longer find connected insertion events by finding a potential `prev_event` connection to the current event we're iterating over. We now solely rely on marker events which when processed, add the insertion event as an extremity and the federating homeserver can ask about it when time calls.
    - Related discussion, matrix-org/synapse#11114 (comment)


Before | After
--- | ---
![](https://user-images.githubusercontent.com/558581/139218681-b465c862-5c49-4702-a59e-466733b0cf45.png) | ![](https://user-images.githubusercontent.com/558581/146453159-a1609e0a-8324-439d-ae44-e4bce43ac6d1.png)



#### Why aren't we sorting topologically when receiving backfill events?

> The main reason we're going to opt to not sort topologically when receiving backfill events is because it's probably best to do whatever is easiest to make it just work. People will probably have opinions once they look at [MSC2716](matrix-org/matrix-spec-proposals#2716) which could change whatever implementation anyway.
> 
> As mentioned, ideally we would do this but code necessary to make the fake edges but it gets confusing and gives an impression of “just whyyyy” (feels icky). This problem also dissolves with online topological ordering.
>
> -- matrix-org/synapse#11114 (comment)

See matrix-org/synapse#11114 (comment) for the technical difficulties
  • Loading branch information
MadLittleMods authored Feb 7, 2022
1 parent cf06783 commit fef2e79
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 149 deletions.
1 change: 1 addition & 0 deletions changelog.d/11114.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
27 changes: 23 additions & 4 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ async def _maybe_backfill_inner(
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)

insertion_events_to_be_backfilled: Dict[str, int] = {}
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
Expand Down Expand Up @@ -271,11 +276,12 @@ async def _maybe_backfill_inner(
]

logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
Expand Down Expand Up @@ -1047,6 +1053,19 @@ async def on_backfill_request(
limit = min(limit, 100)

events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.debug(
"on_backfill_request: backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)

events = await filter_events_for_server(self.storage, origin, events)

Expand Down
34 changes: 29 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,11 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
)

await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
events,
backfilled=True,
)

async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
Expand Down Expand Up @@ -626,11 +630,24 @@ async def _process_pulled_events(
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)

for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down Expand Up @@ -992,6 +1009,8 @@ async def _process_received_pdu(

await self._run_push_actions_and_persist_event(event, context, backfilled)

await self._handle_marker_event(origin, event)

if backfilled or context.rejected:
return

Expand Down Expand Up @@ -1071,8 +1090,6 @@ async def _process_received_pdu(
event.sender,
)

await self._handle_marker_event(origin, event)

async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
Expand Down Expand Up @@ -1323,7 +1340,14 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
return event, context

events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(room_id, tuple(events_to_persist))
await self.persist_events_and_notify(
room_id,
tuple(events_to_persist),
# Mark these events backfilled as they're historic events that will
# eventually be backfilled. For example, missing events we fetch
# during backfill should be marked as backfilled as well.
backfilled=True,
)

async def _check_event_auth(
self,
Expand Down
20 changes: 16 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,12 @@ async def create_event(
requester: Requester,
event_dict: dict,
txn_id: Optional[str] = None,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Expand All @@ -510,6 +510,10 @@ async def create_event(
requester
event_dict: An entire event
txn_id
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
Expand Down Expand Up @@ -604,10 +608,10 @@ async def create_event(
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
allow_no_prev_events=allow_no_prev_events,
)

# In an ideal world we wouldn't need the second part of this condition. However,
Expand Down Expand Up @@ -764,6 +768,7 @@ async def create_and_send_nonmember_event(
self,
requester: Requester,
event_dict: dict,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
Expand All @@ -781,6 +786,10 @@ async def create_and_send_nonmember_event(
Args:
requester: The requester sending the event.
event_dict: An entire event.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
Expand Down Expand Up @@ -880,16 +889,20 @@ async def create_new_client_event(
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
Args:
builder:
requester:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
Expand All @@ -908,7 +921,6 @@ async def create_new_client_event(
Returns:
Tuple of created event, context
"""

# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
full_state_ids_at_event = None
Expand Down
44 changes: 22 additions & 22 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
logger = logging.getLogger(__name__)


def generate_fake_event_id() -> str:
return "$fake_" + random_string(43)


class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
Expand Down Expand Up @@ -182,11 +178,12 @@ async def persist_state_events_at_start(
state_event_ids_at_start = []
auth_event_ids = initial_auth_event_ids.copy()

# Make the state events float off on their own so we don't have a
# bunch of `@mxid joined the room` noise between each batch
prev_event_id_for_state_chain = generate_fake_event_id()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []

for state_event in state_events_at_start:
for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
Expand Down Expand Up @@ -222,7 +219,10 @@ async def persist_state_events_at_start(
content=event_dict["content"],
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -242,7 +242,10 @@ async def persist_state_events_at_start(
event_dict,
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -253,15 +256,14 @@ async def persist_state_events_at_start(
state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_id_for_state_chain = event_id
prev_event_ids_for_state_chain = [event_id]

return state_event_ids_at_start

async def persist_historical_events(
self,
events_to_create: List[JsonDict],
room_id: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
Expand All @@ -277,9 +279,6 @@ async def persist_historical_events(
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events persisted in.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
Expand All @@ -291,11 +290,14 @@ async def persist_historical_events(
"""
assert app_service_requester.app_service

prev_event_ids = initial_prev_event_ids.copy()
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []

event_ids = []
events_to_persist = []
for ev in events_to_create:
for index, ev in enumerate(events_to_create):
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])

assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
Expand All @@ -319,6 +321,9 @@ async def persist_historical_events(
ev["sender"], app_service_requester.app_service
),
event_dict,
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
Expand Down Expand Up @@ -370,7 +375,6 @@ async def handle_batch_of_events(
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
Expand All @@ -385,9 +389,6 @@ async def handle_batch_of_events(
room_id: Room where you want the events created in.
batch_id_to_connect_to: The batch_id from the insertion event you
want this batch to connect to.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
Expand Down Expand Up @@ -436,7 +437,6 @@ async def handle_batch_of_events(
event_ids = await self.persist_historical_events(
events_to_create=events_to_create,
room_id=room_id,
initial_prev_event_ids=initial_prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=app_service_requester,
Expand Down
Loading

0 comments on commit fef2e79

Please sign in to comment.