Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

No more floating MSC2716 historical batches #13971

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 13 additions & 63 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def persist_state_events_at_start(
self,
state_events_at_start: List[JsonDict],
room_id: str,
initial_state_event_ids: List[str],
initial_prev_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
"""Takes all `state_events_at_start` event dictionaries and creates/persists
Expand All @@ -164,10 +164,8 @@ async def persist_state_events_at_start(
Args:
state_events_at_start:
room_id: Room where you want the events persisted in.
initial_state_event_ids:
The base set of state for the historical batch which the floating
state chain will derive from. This should probably be the state
from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
initial_prev_event_ids: TODO: HERE
This should probably be the prev_events defined by `/batch_send?prev_event_id=$abc`.
app_service_requester: The requester of an application service.

Returns:
Expand All @@ -176,12 +174,10 @@ async def persist_state_events_at_start(
assert app_service_requester.app_service

state_event_ids_at_start = []
state_event_ids = initial_state_event_ids.copy()

# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []
# Connect the state chain to the prev_events we're insertin next to
# so that they are valid events and don't get rejected.
prev_event_ids_for_state_chain: List[str] = initial_prev_event_ids

for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
Expand Down Expand Up @@ -216,20 +212,7 @@ async def persist_state_events_at_start(
action=membership,
content=event_dict["content"],
historical=True,
# Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# The first event in the state chain is floating with no
# `prev_events` which means it can't derive state from
# anywhere automatically. So we need to set some state
# explicitly.
#
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append
# later.
state_event_ids=state_event_ids.copy(),
)
else:
(
Expand All @@ -241,24 +224,11 @@ async def persist_state_events_at_start(
),
event_dict,
historical=True,
# Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# The first event in the state chain is floating with no
# `prev_events` which means it can't derive state from
# anywhere automatically. So we need to set some state
# explicitly.
#
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
state_event_ids=state_event_ids.copy(),
)
event_id = event.event_id

state_event_ids_at_start.append(event_id)
state_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_ids_for_state_chain = [event_id]

Expand All @@ -269,7 +239,7 @@ async def persist_historical_events(
events_to_create: List[JsonDict],
room_id: str,
inherited_depth: int,
initial_state_event_ids: List[str],
state_chain_event_id_to_connect_to: str,
app_service_requester: Requester,
) -> List[str]:
"""Create and persists all events provided sequentially. Handles the
Expand All @@ -285,10 +255,7 @@ async def persist_historical_events(
room_id: Room where you want the events persisted in.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
initial_state_event_ids:
This is used to set explicit state for the insertion event at
the start of the historical batch since it's floating with no
prev_events to derive state from automatically.
state_chain_event_id_to_connect_to: TODO: HERE
app_service_requester: The requester of an application service.

Returns:
Expand All @@ -301,10 +268,8 @@ async def persist_historical_events(
# We expect the last event in a historical batch to be an batch event
assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH

# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []
# Connect the historical event chain to the state chain
prev_event_ids: List[str] = [state_chain_event_id_to_connect_to]

event_ids = []
events_to_persist = []
Expand Down Expand Up @@ -332,16 +297,7 @@ async def persist_historical_events(
ev["sender"], app_service_requester.app_service
),
event_dict,
# Only the first event (which is the insertion event) in the
# chain should be floating. The rest should hang off each other
# in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
# Since the first event (which is the insertion event) in the
# chain is floating with no `prev_events`, it can't derive state
# from anywhere automatically. So we need to set some state
# explicitly.
state_event_ids=initial_state_event_ids if index == 0 else None,
historical=True,
depth=inherited_depth,
)
Expand Down Expand Up @@ -390,7 +346,7 @@ async def handle_batch_of_events(
room_id: str,
batch_id_to_connect_to: str,
inherited_depth: int,
initial_state_event_ids: List[str],
state_chain_event_id_to_connect_to: str,
app_service_requester: Requester,
) -> Tuple[List[str], str]:
"""
Expand All @@ -405,13 +361,7 @@ async def handle_batch_of_events(
want this batch to connect to.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
initial_state_event_ids:
This is used to set explicit state for the insertion event at
the start of the historical batch since it's floating with no
prev_events to derive state from automatically. This should
probably be the state from the `prev_event` defined by
`/batch_send?prev_event_id=$abc` plus the outcome of
`persist_state_events_at_start`
state_chain_event_id_to_connect_to: TODO: HERE
app_service_requester: The requester of an application service.

Returns:
Expand Down Expand Up @@ -457,7 +407,7 @@ async def handle_batch_of_events(
events_to_create=events_to_create,
room_id=room_id,
inherited_depth=inherited_depth,
initial_state_event_ids=initial_state_event_ids,
state_chain_event_id_to_connect_to=state_chain_event_id_to_connect_to,
app_service_requester=app_service_requester,
)

Expand Down
22 changes: 11 additions & 11 deletions synapse/rest/client/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,16 @@ async def on_POST(
)

state_event_ids_at_start = []
# Create and persist all of the state events that float off on their own
# before the batch. These will most likely be all of the invite/member
# state events used to auth the upcoming historical messages.
# Create and persist all of the state events in a chain before the
# batch. These will most likely be all of the invite/member state events
# used to auth the upcoming historical messages.
if body["state_events_at_start"]:
state_event_ids_at_start = (
await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
initial_state_event_ids=state_event_ids,
app_service_requester=requester,
)
state_event_ids_at_start = await self.room_batch_handler.persist_state_events_at_start(
state_events_at_start=body["state_events_at_start"],
room_id=room_id,
# Connect the state chain to prev_event we're inserting next to
initial_prev_event_ids=prev_event_ids_from_query,
app_service_requester=requester,
)
# Update our ongoing auth event ID list with all of the new state we
# just created
Expand Down Expand Up @@ -222,7 +221,8 @@ async def on_POST(
room_id=room_id,
batch_id_to_connect_to=batch_id_to_connect_to,
inherited_depth=inherited_depth,
initial_state_event_ids=state_event_ids,
# Connect the historical batch to the state chain
state_chain_event_id_to_connect_to=state_event_ids_at_start[-1],
app_service_requester=requester,
)

Expand Down