Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: Fix incompatibility with restricted joins #14882

Merged
merged 6 commits into from
Jan 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 105 additions & 93 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
Expand Down Expand Up @@ -182,6 +181,12 @@ def __init__(self, hs: "HomeServer"):
self._partial_state_syncs_maybe_needing_restart: Dict[
str, Tuple[Optional[str], Collection[str]]
] = {}
# A lock guarding the partial state flag for rooms.
# When the lock is held for a given room, no other concurrent code may
# partial state or un-partial state the room.
self._is_partial_state_room_linearizer = Linearizer(
name="_is_partial_state_room_linearizer"
)

# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
Expand Down Expand Up @@ -627,107 +632,107 @@ async def do_invite_join(
except ValueError:
pass

ret = await self.federation_client.send_join(
host_list,
event,
room_version_obj,
# Perform a full join when we are already in the room and it is a full
# state room, since we are not allowed to persist a partial state join
# event in a full state room. In the future, we could optimize this by
# always performing a partial state join and computing the state
# ourselves or retrieving it from the remote homeserver if necessary.
#
# There's a race where we leave the room, then perform a full join
# anyway. This should end up being fast anyway, since we would already
# have the full room state and auth chain persisted.
partial_state=(
not is_host_joined
or await self.store.is_partial_state_room(room_id)
),
)

event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)
async with self._is_partial_state_room_linearizer.queue(room_id):
ret = await self.federation_client.send_join(
host_list,
event,
room_version_obj,
# Perform a full join when we are already in the room and it is a
# full state room, since we are not allowed to persist a partial
# state join event in a full state room. In the future, we could
# optimize this by always performing a partial state join and
# computing the state ourselves or retrieving it from the remote
# homeserver if necessary.
#
# There's a race where we leave the room, then perform a full join
# anyway. This should end up being fast anyway, since we would
# already have the full room state and auth chain persisted.
partial_state=(
not is_host_joined
or await self.store.is_partial_state_room(room_id)
),
)

logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
event = ret.event
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)

logger.debug("do_invite_join event: %s", event)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)

# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
room_version=room_version_obj,
state_events=state,
)
logger.debug("do_invite_join event: %s", event)

if ret.partial_state:
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
await self.store.store_partial_state_room(
# if this is the first time we've joined this room, it's time to add
# a row to `rooms` with the correct room version. If there's already a
# row there, we should override it, since it may have been populated
# based on an invite request which lied about the room version.
#
# federation_client.send_join has already checked that the room
# version in the received create event is the same as room_version_obj,
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
room_version=room_version_obj,
state_events=state,
)

try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError as e:
# The homeserver was already in the room and it is no longer partial
# stated. We ought to be doing a local join instead. Turn the error into
# a 429, as a hint to the client to try again.
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
# do a remote join for restricted rooms even if we have full state.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep foreign
# key constraints intact.
if ret.partial_state:
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
# Mark the room as having partial state.
# The background process is responsible for unmarking this flag,
# even if the join fails.
await self.store.store_partial_state_room(
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
joined_via=origin,
)

try:
max_stream_id = (
await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)
)
except PartialStateConflictError:
# This should be impossible, since we hold the lock on the room's
# partial statedness.
logger.error(
"Room %s was un-partial stated while processing remote join.",
room_id,
)
raise
else:
# Record the join event id for future use (when we finish the full
# join). We have to do this after persisting the event to keep
# foreign key constraints intact.
if ret.partial_state:
await self.store.write_partial_state_rooms_join_event_id(
room_id, event.event_id
)
finally:
# Always kick off the background process that asynchronously fetches
# state for the room.
# If the join failed, the background process is responsible for
# cleaning up — including unmarking the room as a partial state
# room.
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for
# this room.
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
await self._replication.wait_for_stream_position(
Expand Down Expand Up @@ -1809,6 +1814,11 @@ async def _sync_partial_state_room(
`initial_destination` is unavailable
room_id: room to be resynced
"""
# Assume that we run on the main process for now.
# When moving the sync to workers, we need to ensure that
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
# * `_start_partial_state_room_sync` still prevents duplicate resyncs
# * `_is_partial_state_room_linearizer` correctly guards partial state flags
# for rooms between the workers doing remote joins and resync.
assert not self.config.worker.worker_app

# TODO(faster_joins): do we need to lock to avoid races? What happens if other
Expand Down Expand Up @@ -1846,8 +1856,10 @@ async def _sync_partial_state_room(
logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)

if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
Expand Down