From 3ca6b562b8a062ed2f235f7e63d8d1751b5ef756 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Apr 2020 17:58:18 +0100 Subject: [PATCH 01/18] Make location of events writer configurable --- synapse/config/workers.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index c80c338584cf..a4cd4d3066a0 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -27,6 +27,17 @@ class InstanceLocationConfig: port = attr.ib(type=int) +@attr.s +class WriterLocations: + """Specifies the instances that write various streams. + + Attributes: + events: The instance that writes to the event and backfill streams. + """ + + events = attr.ib(default="master", type=str) + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -88,6 +99,10 @@ def read_config(self, config, **kwargs): name: InstanceLocationConfig(**c) for name, c in instance_map.items() } + # Map from type of streams to source, c.f. WriterLocations. + writers = config.get("writers", {}) or {} + self.writers = WriterLocations(**writers) + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running From a447c2eb0c56eee8a49ca992cd0031f22984f755 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2020 16:24:59 +0100 Subject: [PATCH 02/18] Use new writers config --- synapse/handlers/federation.py | 10 +++++----- synapse/handlers/message.py | 10 ++++++---- synapse/storage/data_stores/main/events_worker.py | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 81d859f807bb..236325567db0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -125,10 +125,9 @@ def __init__(self, hs): self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() + self._instance_name = hs.get_instance_name() - self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( - hs - ) + self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( hs ) @@ -2837,8 +2836,9 @@ async def persist_events_and_notify( backfilled: Whether these events are a result of backfilling or not """ - if self.config.worker_app: - await self._send_events_to_master( + if self.config.worker.writers.events != self._instance_name: + await self._send_events( + instance_name=self.config.worker.writers.events, store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0242521cc64c..e4c1b8cb00d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -365,10 +365,11 @@ def __init__(self, hs): self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases + self._instance_name = hs.get_instance_name() self.room_invite_state_types = self.hs.config.room_invite_state_types - self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) + self.send_event = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -822,8 +823,9 @@ async def handle_new_client_event( success = False try: # If we're a worker we need to hit out to the master. - if self.config.worker_app: - await self.send_event_to_master( + if self.config.worker.writers.events != self._instance_name: + await self.send_event( + instance_name=self.config.worker.writers.events, event_id=event.event_id, store=self.store, requester=requester, @@ -888,7 +890,7 @@ async def persist_and_notify_client_event( This should only be run on master. """ - assert not self.config.worker_app + assert self.config.worker.writers.events == self._instance_name if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 9130b74eb57c..6775689c61b0 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if hs.config.worker_app is None: + if hs.config.worker.writers.events == hs.get_instance_name(): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( From 1f6dbc3cd405839accac1a40e3237cf79ed05b49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 May 2020 11:26:12 +0100 Subject: [PATCH 03/18] Enable moving event persistence off of master --- synapse/replication/tcp/handler.py | 8 ++++++++ synapse/storage/data_stores/__init__.py | 6 +++--- synapse/storage/data_stores/main/events.py | 6 +++--- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index acfa66a7a842..ee4b978a5a84 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -38,7 +38,9 @@ from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.streams import ( STREAMS_MAP, + BackfillStream, CachesStream, + EventsStream, FederationStream, Stream, ) @@ -87,6 +89,12 @@ def __init__(self, hs): self._streams_to_replicate.append(stream) continue + if ( + isinstance(stream, (EventsStream, BackfillStream)) + and hs.config.worker.writers.events == hs.get_instance_name() + ): + self._streams_to_replicate.append(stream) + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index 791961b296dc..bf7519295fd1 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -66,9 +66,9 @@ def __init__(self, main_store_class, hs): self.main = main_store_class(database, db_conn, hs) - # If we're on a process that can persist events (currently - # master), also instantiate a `PersistEventsStore` - if hs.config.worker.worker_app is None: + # If we're on a process that can persist events also + # instansiate a `PersistEventsStore` + if hs.config.worker.writers.events == hs.get_instance_name(): self.persist_events = PersistEventsStore( hs, database, self.main ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index a97f8b39344e..bd2a6132f284 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -138,10 +138,10 @@ def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore") self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator - # This should only exist on master for now + # This should only exist on instances that are configured to write assert ( - hs.config.worker.worker_app is None - ), "Can only instantiate PersistEventsStore on master" + hs.config.worker.writers.events == hs.get_instance_name() + ), "Can only instantiate EventsStore on master" @_retry_on_integrity_error @defer.inlineCallbacks From 9541abd8c7864ca5b3e8afb6115c653ff176c3a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2020 15:33:11 +0100 Subject: [PATCH 04/18] Add some debugging --- synapse/config/logger.py | 1 + synapse/replication/http/_base.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index a25c70e928cf..49f6c32beb7f 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -257,5 +257,6 @@ def read_config(*args, callback=None): logging.warning("***** STARTING SERVER *****") logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) + logging.info("Instance name: %s", hs.get_instance_name()) return logger diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index c3136a4eb93f..793cef6c268c 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -142,6 +142,7 @@ def make_client(cls, hs): """ clock = hs.get_clock() client = hs.get_simple_http_client() + local_instance_name = hs.get_instance_name() master_host = hs.config.worker_replication_host master_port = hs.config.worker_replication_http_port @@ -151,6 +152,8 @@ def make_client(cls, hs): @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): + if instance_name == local_instance_name: + raise Exception("Trying to send HTTP request to self") if instance_name == "master": host = master_host port = master_port From 7db1f2282958dad8ff6ffab82e988cb22882f6f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2020 16:54:05 +0100 Subject: [PATCH 05/18] Add check that event writer has an entry in instance_map --- synapse/config/workers.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index a4cd4d3066a0..57665477cf74 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import attr -from ._base import Config +from ._base import Config, ConfigError @attr.s @@ -103,6 +103,17 @@ def read_config(self, config, **kwargs): writers = config.get("writers", {}) or {} self.writers = WriterLocations(**writers) + # Check that the configured writer for events also appears in + # `instance_map`. + if ( + self.writers.events != "master" + and self.writers.events not in self.instance_map + ): + raise ConfigError( + "Instance %r is configured to write events but does not appear in `instance_map` config." + % (self.writers.events,) + ) + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running From 86b1d753690bc4dbd9398979d9785499b21c3127 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2020 16:54:30 +0100 Subject: [PATCH 06/18] Newsfile --- changelog.d/7517.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7517.feature diff --git a/changelog.d/7517.feature b/changelog.d/7517.feature new file mode 100644 index 000000000000..6062f0061c7b --- /dev/null +++ b/changelog.d/7517.feature @@ -0,0 +1 @@ +Add option to move event persistence off master. From 64949e78b10f1dc26c7c675d1c7c040b7e08d1b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2020 17:23:16 +0100 Subject: [PATCH 07/18] Fix port script --- scripts/synapse_port_db | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index acd9ac4b7525..9a0fbc61d87e 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -196,6 +196,9 @@ class MockHomeserver: def get_reactor(self): return reactor + def get_instance_name(self): + return "master" + class Porter(object): def __init__(self, **kwargs): From 266755226ba733a1657288ace63aef1f2584e735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 12:31:27 +0100 Subject: [PATCH 08/18] Fixup review comments --- synapse/config/workers.py | 4 ++-- synapse/handlers/message.py | 2 +- synapse/replication/tcp/handler.py | 2 ++ synapse/storage/data_stores/__init__.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 57665477cf74..1c2fe31e6eeb 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -94,13 +94,13 @@ def read_config(self, config, **kwargs): bind_addresses.append("") # A map from instance name to host/port of their HTTP replication endpoint. - instance_map = config.get("instance_map", {}) or {} + instance_map = config.get("instance_map") or {} self.instance_map = { name: InstanceLocationConfig(**c) for name, c in instance_map.items() } # Map from type of streams to source, c.f. WriterLocations. - writers = config.get("writers", {}) or {} + writers = config.get("writers") or {} self.writers = WriterLocations(**writers) # Check that the configured writer for events also appears in diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e4c1b8cb00d7..aa0497c717ef 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -888,7 +888,7 @@ async def persist_and_notify_client_event( """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. - This should only be run on master. + This should only be run on instance in charge of persisting events. """ assert self.config.worker.writers.events == self._instance_name diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ee4b978a5a84..335ee2b5c7ea 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -89,6 +89,8 @@ def __init__(self, hs): self._streams_to_replicate.append(stream) continue + # Only add EventStream and BackfillStream as a source on the + # instance in charge of event persistence. if ( isinstance(stream, (EventsStream, BackfillStream)) and hs.config.worker.writers.events == hs.get_instance_name() diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index bf7519295fd1..599ee470d423 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -67,7 +67,7 @@ def __init__(self, main_store_class, hs): self.main = main_store_class(database, db_conn, hs) # If we're on a process that can persist events also - # instansiate a `PersistEventsStore` + # instantiate a `PersistEventsStore` if hs.config.worker.writers.events == hs.get_instance_name(): self.persist_events = PersistEventsStore( hs, database, self.main From c42b180f4d4a020530875ebd5d691c4f84b50d48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 12:46:04 +0100 Subject: [PATCH 09/18] Fix typo Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index aa0497c717ef..68b84b1410bb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -888,7 +888,7 @@ async def persist_and_notify_client_event( """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. - This should only be run on instance in charge of persisting events. + This should only be run on the instance in charge of persisting events. """ assert self.config.worker.writers.events == self._instance_name From d2012df31c8c5ec2a2f832f0edd2269e152ac194 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 13:22:41 +0100 Subject: [PATCH 10/18] Rename config var to stream_writers --- synapse/config/workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 1c2fe31e6eeb..ed06b91a54a1 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -100,7 +100,7 @@ def read_config(self, config, **kwargs): } # Map from type of streams to source, c.f. WriterLocations. - writers = config.get("writers") or {} + writers = config.get("stream_writers") or {} self.writers = WriterLocations(**writers) # Check that the configured writer for events also appears in From 6a539b63785cbe19094d6cc72582fce0fb6e2625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 15:42:59 +0100 Subject: [PATCH 11/18] fix missing continue --- synapse/replication/tcp/handler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 335ee2b5c7ea..03300e533608 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -89,13 +89,13 @@ def __init__(self, hs): self._streams_to_replicate.append(stream) continue - # Only add EventStream and BackfillStream as a source on the - # instance in charge of event persistence. - if ( - isinstance(stream, (EventsStream, BackfillStream)) - and hs.config.worker.writers.events == hs.get_instance_name() - ): - self._streams_to_replicate.append(stream) + if isinstance(stream, (EventsStream, BackfillStream)): + # Only add EventStream and BackfillStream as a source on the + # instance in charge of event persistence. + if hs.config.worker.writers.events == hs.get_instance_name(): + self._streams_to_replicate.append(stream) + + continue # Only add any other streams if we're on master. if hs.config.worker_app is not None: From c0502f3700f550269a8429db5e559313743ebbd1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 18:07:07 +0100 Subject: [PATCH 12/18] Move mark_remote_user_device_list_as_unsubscribed to worker store --- synapse/storage/data_stores/main/devices.py | 30 +++++++++++++-------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index fe6d6ecfe0e0..0466c2327819 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -679,6 +679,25 @@ def mark_remote_user_device_cache_as_stale(self, user_id: str): desc="make_remote_user_device_cache_as_stale", ) + def mark_remote_user_device_list_as_unsubscribed(self, user_id): + """Mark that we no longer track device lists for remote user. + """ + + def _mark_remote_user_device_list_as_unsubscribed_txn(txn): + self.db.simple_delete_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + ) + self._invalidate_cache_and_stream( + txn, self.get_device_list_last_stream_id_for_remote, (user_id,) + ) + + return self.db.runInteraction( + "mark_remote_user_device_list_as_unsubscribed", + _mark_remote_user_device_list_as_unsubscribed_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: Database, db_conn, hs): @@ -959,17 +978,6 @@ def update_device(self, user_id, device_id, new_display_name=None): desc="update_device", ) - @defer.inlineCallbacks - def mark_remote_user_device_list_as_unsubscribed(self, user_id): - """Mark that we no longer track device lists for remote user. - """ - yield self.db.simple_delete( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - desc="mark_remote_user_device_list_as_unsubscribed", - ) - self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) - def update_remote_device_list_cache_entry( self, user_id, device_id, content, stream_id ): From a9ec5f4db52a7412568ac51b1c9ff23dad96a146 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2020 15:41:41 +0100 Subject: [PATCH 13/18] Fix buping presence active time when using workers --- synapse/app/generic_worker.py | 53 ++++++++++-- synapse/replication/http/__init__.py | 2 + synapse/replication/http/presence.py | 116 +++++++++++++++++++++++++++ 3 files changed, 166 insertions(+), 5 deletions(-) create mode 100644 synapse/replication/http/presence.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 506b70443b5d..31b8c5dbf9c0 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -40,7 +40,11 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import BasePresenceHandler, get_interested_parties +from synapse.handlers.presence import ( + BasePresenceHandler, + PresenceState, + get_interested_parties, +) from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite @@ -48,6 +52,10 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.replication.http.presence import ( + ReplicationBumpPresenceActiveTime, + ReplicationPresenceSetState, +) from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -256,6 +264,9 @@ def __init__(self, hs): # but we haven't notified the master of that yet self.users_going_offline = {} + self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) + self._set_state_client = ReplicationPresenceSetState.make_client(hs) + self._send_stop_syncing_loop = self.clock.looping_call( self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) @@ -313,10 +324,6 @@ def send_stop_syncing(self): self.users_going_offline.pop(user_id, None) self.send_user_sync(user_id, False, last_sync_ms) - def set_state(self, user, state, ignore_status_msg=False): - # TODO Hows this supposed to work? - return defer.succeed(None) - async def user_syncing( self, user_id: str, affect_presence: bool ) -> ContextManager[None]: @@ -395,6 +402,42 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]: if count > 0 ] + async def set_state(self, target_user, state, ignore_status_msg=False): + """Set the presence state of the user. + """ + presence = state["presence"] + + valid_presence = ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ) + if presence not in valid_presence: + raise SynapseError(400, "Invalid presence state") + + user_id = target_user.to_string() + + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + await self._set_state_client( + user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + ) + + async def bump_presence_active_time(self, user): + """We've seen the user do something that indicates they're interacting + with the app. + """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + user_id = user.to_string() + await self._bump_active_client(user_id=user_id) + class GenericWorkerTyping(object): def __init__(self, hs): diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index a909744e9345..7232d6ea223a 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -19,6 +19,7 @@ federation, login, membership, + presence, register, send_event, streams, @@ -35,6 +36,7 @@ def __init__(self, hs): def register_servlets(self, hs): send_event.register_servlets(hs, self) federation.register_servlets(hs, self) + presence.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py new file mode 100644 index 000000000000..ea1b33331ba3 --- /dev/null +++ b/synapse/replication/http/presence.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationBumpPresenceActiveTime(ReplicationEndpoint): + """We've seen the user do something that indicates they're interacting + with the app. + + The POST looks like: + + POST /_synapse/replication/bump_presence_active_time/ + + 200 OK + + {} + """ + + NAME = "bump_presence_active_time" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + await self._presence_handler.bump_presence_active_time( + UserID.from_string(user_id) + ) + + return ( + 200, + {}, + ) + + +class ReplicationPresenceSetState(ReplicationEndpoint): + """Set the presence state for a user. + + The POST looks like: + + POST /_synapse/replication/presence_set_state/ + + { + "state": { ... }, + "ignore_status_msg": false, + } + + 200 OK + + {} + """ + + NAME = "presence_set_state" + PATH_ARGS = ("user_id",) + METHOD = "POST" + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._presence_handler = hs.get_presence_handler() + + @staticmethod + def _serialize_payload(user_id, state, ignore_status_msg=False): + return { + "state": state, + "ignore_status_msg": ignore_status_msg, + } + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + await self._presence_handler.set_state( + UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + ) + + return ( + 200, + {}, + ) + + +def register_servlets(hs, http_server): + ReplicationBumpPresenceActiveTime(hs).register(http_server) + ReplicationPresenceSetState(hs).register(http_server) From fcbaf3fe7b81bdcfb7ae6055df32ceadfea0548a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2020 10:10:03 +0100 Subject: [PATCH 14/18] Fix mypy --- synapse/handlers/presence.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ea11c0754df..3594f3b00fe1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -193,6 +193,12 @@ async def set_state( ) -> None: """Set the presence state of the user. """ + @abc.abstractmethod + async def bump_presence_active_time(self, user: UserID): + """We've seen the user do something that indicates they're interacting + with the app. + """ + class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "synapse.server.HomeServer"): From 8b0218635d6688eb1adf06d28b58bf9ae02a310c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2020 10:44:39 +0100 Subject: [PATCH 15/18] Move locally_reject_invite into event persistence. This also means adding a replication API to call it from other instances. --- synapse/handlers/room_member.py | 34 ++++++++++++++--- synapse/replication/http/__init__.py | 2 +- synapse/replication/http/membership.py | 38 ++++++++++++++++++- synapse/storage/data_stores/main/events.py | 26 +++++++++++++ .../storage/data_stores/main/roommember.py | 23 ----------- synapse/storage/persist_events.py | 6 +++ 6 files changed, 98 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e51e1c32fed0..7ede6c33e4f5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -26,6 +26,9 @@ from synapse.api.errors import AuthError, Codes, SynapseError from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.replication.http.membership import ( + ReplicationLocallyRejectInviteRestServlet, +) from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room @@ -44,11 +47,6 @@ class RoomMemberHandler(object): __metaclass__ = abc.ABCMeta def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() @@ -71,6 +69,17 @@ def __init__(self, hs): self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles + self._event_stream_writer_instance = hs.config.worker.writers.events + self._is_on_event_persistence_instance = ( + self._event_stream_writer_instance == hs.get_instance_name() + ) + if self._is_on_event_persistence_instance: + self.persist_event_storage = hs.get_storage().persistence + else: + self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client( + hs + ) + # This is only used to get at ratelimit function, and # maybe_kick_guest_users. It's fine there are multiple of these as # it doesn't store state. @@ -121,6 +130,19 @@ async def _remote_reject_invite( """ raise NotImplementedError() + async def locally_reject_invite(self, user_id: str, room_id: str): + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + if self._is_on_event_persistence_instance: + await self.persist_event_storage.locally_reject_invite(user_id, room_id) + else: + await self._locally_reject_client( + instance_name=self._event_stream_writer_instance, + user_id=user_id, + room_id=room_id, + ) + @abc.abstractmethod async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has joined the @@ -1000,7 +1022,7 @@ async def _remote_reject_invite( # logger.warning("Failed to reject invite: %s", e) - await self.store.locally_reject_invite(target.to_string(), room_id) + await self.locally_reject_invite(target.to_string(), room_id) return {} async def _user_joined_room(self, target: UserID, room_id: str) -> None: diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 7232d6ea223a..19b69e0e113b 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -37,10 +37,10 @@ def register_servlets(self, hs): send_event.register_servlets(hs, self) federation.register_servlets(hs, self) presence.register_servlets(hs, self) + membership.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: - membership.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) devices.register_servlets(hs, self) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 3577611fd791..ff80708e1e1c 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,12 +14,16 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -106,6 +110,7 @@ def __init__(self, hs): self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + self.member_handler = hs.get_room_member_handler() @staticmethod def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): @@ -149,12 +154,42 @@ async def _handle_request(self, request, room_id, user_id): # logger.warning("Failed to reject invite: %s", e) - await self.store.locally_reject_invite(user_id, room_id) + await self.member_handler.locally_reject_invite(user_id, room_id) ret = {} return 200, ret +class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room locally. + + Request format: + + POST /_synapse/replication/locally_reject_invite/:room_id/:user_id + + {} + """ + + NAME = "locally_reject_invite" + PATH_ARGS = ("room_id", "user_id") + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.member_handler = hs.get_room_member_handler() + + @staticmethod + def _serialize_payload(room_id, user_id): + return {} + + async def _handle_request(self, request, room_id, user_id): + logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) + + await self.member_handler.locally_reject_invite(user_id, room_id) + + return 200, {} + + class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): """Notifies that a user has joined or left the room @@ -208,3 +243,4 @@ def register_servlets(hs, http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) + ReplicationLocallyRejectInviteRestServlet(hs).register(http_server) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index bd2a6132f284..3c64cf15b776 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1590,3 +1590,29 @@ def _update_backward_extremeties(self, txn, events): if not ev.internal_metadata.is_outlier() ], ) + + async def locally_reject_invite(self, user_id: str, room_id: str): + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + # We also clear this entry from `local_current_membership`. + # Ideally we'd point to a leave event, but we don't have one, so + # nevermind. + self.db.simple_delete_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": room_id, "user_id": user_id}, + ) + + with self._stream_id_gen.get_next() as stream_ordering: + await self.db.runInteraction("locally_reject_invite", f, stream_ordering) diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 1e9c85015274..137ebac8339d 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -1046,29 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): super(RoomMemberStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.db.runInteraction("locally_reject_invite", f, stream_ordering) - def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 41881ea20b25..b0fdf82b6ae4 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -786,3 +786,9 @@ async def _handle_potentially_left_users(self, user_ids: Set[str]): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) + + async def locally_reject_invite(self, user_id: str, room_id: str): + """Mark the invite has having been rejected even though we failed to + create a leave event for it. + """ + await self.persist_events_store.locally_reject_invite(user_id, room_id) From 9468caa761f5cfe869f7a55f78aeeefa50e931f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2020 14:43:05 +0100 Subject: [PATCH 16/18] Add assertion for do_invite_join. --- synapse/handlers/federation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 236325567db0..1aa017a6472c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1234,6 +1234,10 @@ async def do_invite_join( content: The event content to use for the join event. """ + # TODO: We should be able to call this on workers, but the upgrading of + # room stuff after join currently doesn't work on workers. + assert self.config.worker.worker_app is None + logger.debug("Joining %s to %s", joinee, room_id) origin, event, room_version_obj = await self._make_and_verify_event( From 0b2ac9ec9cd58342ccd624ae5ca8120f4b7c7af6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 15:10:03 +0100 Subject: [PATCH 17/18] Fixup --- synapse/handlers/room.py | 7 +++++++ synapse/handlers/room_member.py | 13 +++++++------ synapse/replication/http/membership.py | 8 +++++--- synapse/rest/admin/rooms.py | 7 ++++++- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2698a129cac9..61db3ccc43ba 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -89,6 +89,8 @@ def __init__(self, hs): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + self._replication = hs.get_replication_data_handler() + # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -752,6 +754,11 @@ async def create_room( if room_alias: result["room_alias"] = room_alias.to_string() + # Always wait for room creation to progate before returning + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", last_stream_id + ) + return result, last_stream_id async def _send_events_for_new_room( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 77d23163c6fe..0f7af982f068 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -130,18 +130,21 @@ async def _remote_reject_invite( """ raise NotImplementedError() - async def locally_reject_invite(self, user_id: str, room_id: str): + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: """Mark the invite has having been rejected even though we failed to create a leave event for it. """ if self._is_on_event_persistence_instance: - await self.persist_event_storage.locally_reject_invite(user_id, room_id) + return await self.persist_event_storage.locally_reject_invite( + user_id, room_id + ) else: - await self._locally_reject_client( + result = await self._locally_reject_client( instance_name=self._event_stream_writer_instance, user_id=user_id, room_id=room_id, ) + return result["stream_id"] @abc.abstractmethod async def _user_joined_room(self, target: UserID, room_id: str) -> None: @@ -1037,9 +1040,7 @@ async def _remote_reject_invite( # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite( - target.to_string(), room_id - ) + stream_id = await self.locally_reject_invite(target.to_string(), room_id) return None, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 489e8592a9ff..a7174c4a8fc8 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -154,7 +154,9 @@ async def _handle_request(self, request, room_id, user_id): # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.store.locally_reject_invite(user_id, room_id) + stream_id = await self.member_handler.locally_reject_invite( + user_id, room_id + ) event_id = None return 200, {"event_id": event_id, "stream_id": stream_id} @@ -185,9 +187,9 @@ def _serialize_payload(room_id, user_id): async def _handle_request(self, request, room_id, user_id): logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) - await self.member_handler.locally_reject_invite(user_id, room_id) + stream_id = await self.member_handler.locally_reject_invite(user_id, room_id) - return 200, {} + return 200, {"stream_id": stream_id} class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 0a13e1ed348c..d57f543bfa7f 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -113,7 +113,7 @@ async def on_POST(self, request, room_id): try: target_requester = create_requester(user_id) - await self.room_member_handler.update_membership( + _, stream_id = await self.room_member_handler.update_membership( requester=target_requester, target=target_requester.user, room_id=room_id, @@ -123,6 +123,11 @@ async def on_POST(self, request, room_id): require_consent=False, ) + # Wait for leave to come in over replication before trying to forget. + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) + await self.room_member_handler.forget(target_requester.user, room_id) await self.room_member_handler.update_membership( From f4a18485b465cae100a58076e11c12de40a9b560 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 15:36:02 +0100 Subject: [PATCH 18/18] fix missing change from master -> event writers --- synapse/handlers/federation.py | 2 +- synapse/rest/admin/rooms.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index acd1e38ec9bf..75ec90d26730 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1317,7 +1317,7 @@ async def do_invite_join( # # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - "master", "events", max_stream_id + self.config.worker.writers.events, "events", max_stream_id ) # Check whether this room is the result of an upgrade of a room we already know diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index d57f543bfa7f..8173baef8f2c 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -100,7 +100,9 @@ async def on_POST(self, request, room_id): # we try and auto join below. # # TODO: Currently the events stream is written to from master - await self._replication.wait_for_stream_position("master", "events", stream_id) + await self._replication.wait_for_stream_position( + self.hs.config.worker.writers.events, "events", stream_id + ) users = await self.state.get_current_users_in_room(room_id) kicked_users = []