Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Enable Faster Remote Room Joins against worker-mode Synapse. #14752

Merged
merged 29 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9f43880
Enable Complement tests for Faster Remote Room Joins on worker-mode
reivilibre Dec 6, 2022
58d4f93
(dangerous) Add an override to allow Complement to use FRRJ under wor…
reivilibre Dec 29, 2022
29c331d
Newsfile
reivilibre Dec 29, 2022
91faaf6
Fix race where we didn't send out replication notification
erikjohnston Jan 6, 2023
50776af
MORE HACKS
erikjohnston Jan 10, 2023
4d1a4f0
Fix get_un_partial_stated_rooms_token to take instance_name
erikjohnston Jan 19, 2023
c135941
Merge remote-tracking branch 'origin/develop' into rei/frrj_workers_c…
erikjohnston Jan 19, 2023
5b97e4e
Fix bad merge
erikjohnston Jan 19, 2023
d24b947
Remove warning
erikjohnston Jan 19, 2023
aa97b35
Correctly advance un_partial_stated_room_stream
erikjohnston Jan 19, 2023
a3cddbf
Fix merge
erikjohnston Jan 19, 2023
eb65ad3
Add another notify_replication
erikjohnston Jan 19, 2023
1c77611
Fixups
erikjohnston Jan 19, 2023
af2815a
Create a separate ReplicationNotifier
erikjohnston Jan 20, 2023
539e147
Merge remote-tracking branch 'origin/develop' into rei/frrj_workers_c…
erikjohnston Jan 20, 2023
d650011
Fix test
erikjohnston Jan 20, 2023
8ae6c31
Fix portdb
erikjohnston Jan 20, 2023
71472ba
Create a separate ReplicationNotifier
erikjohnston Jan 20, 2023
b72b698
Fix test
erikjohnston Jan 20, 2023
5a4355d
Fix portdb
erikjohnston Jan 20, 2023
f2a097c
Fix presence test
erikjohnston Jan 20, 2023
56e6dda
Newsfile
erikjohnston Jan 20, 2023
36433ed
Merge branch 'erikj/repl_notifieri' into rei/frrj_workers_complement
erikjohnston Jan 20, 2023
9a85309
Merge branch 'develop' into rei/frrj_workers_complement
erikjohnston Jan 20, 2023
207c0a3
Apply suggestions from code review
erikjohnston Jan 20, 2023
f76879a
Merge branch 'develop' into rei/frrj_workers_complement
erikjohnston Jan 20, 2023
b7df499
Update changelog.d/14752.misc
Jan 22, 2023
df83750
Merge branch 'develop' into rei/frrj_workers_complement
Jan 22, 2023
4d62667
lint
Jan 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14752.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse. Not suitable for production use.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 0 additions & 2 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ allow_device_name_lookup_over_federation: true
experimental_features:
# Enable history backfilling support
msc2716_enabled: true
{% if not workers_in_use %}
# client-side support for partial state in /send_join responses
faster_joins: true
{% endif %}
# Enable support for polls
msc3381_polls_enabled: true
# Enable deleting device-specific notification settings stored in account data
Expand Down
11 changes: 4 additions & 7 deletions scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ fi

extra_test_args=()

test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930"
test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930,faster_joins"

# All environment variables starting with PASS_ will be shared.
# (The prefix is stripped off before reaching the container.)
Expand Down Expand Up @@ -223,12 +223,9 @@ else
export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite
fi

# We only test faster room joins on monoliths, because they are purposefully
# being developed without worker support to start with.
#
# The tests for importing historical messages (MSC2716) also only pass with monoliths,
# currently.
test_tags="$test_tags,faster_joins,msc2716"
# The tests for importing historical messages (MSC2716)
# only pass with monoliths, currently.
test_tags="$test_tags,msc2716"
fi


Expand Down
7 changes: 0 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,6 @@ def start(config_options: List[str]) -> None:
"synapse.app.user_dir",
)

if config.experimental.faster_joins_enabled:
raise ConfigError(
"You have enabled the experimental `faster_joins` config option, but it is "
"not compatible with worker deployments yet. Please disable `faster_joins` "
"or run Synapse as a single process deployment instead."
)

synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage

Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
self._notifier = hs.get_notifier()

self._remote_edu_linearizer = Linearizer(name="remote_device_list")

Expand Down Expand Up @@ -1054,6 +1055,7 @@ async def incoming_device_list_update(
user_id,
device_id,
)
self._notifier.notify_replication()

room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1727,14 +1727,16 @@ async def _sync_partial_state_room(

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)

# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
Expand Down
5 changes: 2 additions & 3 deletions synapse/replication/tcp/streams/partial_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import attr

from synapse.replication.tcp.streams import Stream
from synapse.replication.tcp.streams._base import current_token_without_instance

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -43,7 +42,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_token,
store.get_un_partial_stated_rooms_from_stream,
)

Expand Down Expand Up @@ -71,6 +70,6 @@ def __init__(self, hs: "HomeServer"):
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
current_token_without_instance(store.get_un_partial_stated_events_token),
store.get_un_partial_stated_events_token,
store.get_un_partial_stated_events_from_stream,
)
16 changes: 11 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,12 @@ def get_chain_id_txn(txn: Cursor) -> int:
db_conn, "un_partial_stated_event_stream", "stream_id"
)

def get_un_partial_stated_events_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
# writers because workers that don't write often will hold all
# readers up.
return self._un_partial_stated_events_stream_id_gen.get_current_token()
def get_un_partial_stated_events_token(self, instance_name: str) -> int:
return (
self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
instance_name
)
)

async def get_un_partial_stated_events_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
Expand Down Expand Up @@ -408,6 +409,11 @@ def process_replication_position(
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
logger.info(
"Advancing %s token to %s", UnPartialStatedEventStream.NAME, token
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._un_partial_stated_events_stream_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)

async def have_censored_event(self, event_id: str) -> bool:
Expand Down
20 changes: 13 additions & 7 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.config.homeserver import HomeServerConfig
from synapse.events import EventBase
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -140,6 +141,13 @@ def __init__(
db_conn, "un_partial_stated_room_stream", "stream_id"
)

def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == UnPartialStatedRoomStream.NAME:
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
return super().process_replication_position(stream_name, instance_name, token)

async def store_room(
self,
room_id: str,
Expand Down Expand Up @@ -1277,13 +1285,10 @@ async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
)
return result["join_event_id"], result["device_lists_stream_id"]

def get_un_partial_stated_rooms_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there
# are multiple writers because workers that don't write often will
# hold all readers up.
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer(
instance_name
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
Expand Down Expand Up @@ -2375,3 +2380,4 @@ def _clear_partial_state_room_txn(
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
txn.execute(sql, (device_lists_stream_id,))
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def process_replication_rows(
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self._get_state_group_for_event.invalidate((row.event_id,))
self.is_partial_state_event.invalidate((row.event_id,))

super().process_replication_rows(stream_name, instance_name, token, rows)

Expand Down Expand Up @@ -485,6 +486,7 @@ def _update_state_for_partial_state_event_txn(
"rejection_status_changed": rejection_status_changed,
},
)
txn.call_after(self.hs.get_notifier().on_new_replication_data)
squahtx marked this conversation as resolved.
Show resolved Hide resolved


class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
Expand Down