From e6c25e085862a1ff45151546a3c82a595d3aa915 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Mar 2020 13:53:08 +0100 Subject: [PATCH 01/16] Allow ReplicationCommandHandler to have multiple connections This will allow the server replication component to use ReplicationCommandHandler. --- synapse/replication/tcp/handler.py | 43 ++++++++++++++++++----------- synapse/replication/tcp/protocol.py | 5 ++-- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 12a1cfd6d1c4..3fe736bb772e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -69,8 +69,8 @@ def __init__(self, hs): # The factory used to create connections. self._factory = None # type: Optional[ReplicationClientFactory] - # The current connection. None if we are currently (re)connecting - self._connection = None + # The currently connected connections. + self._connections = [] def start_replication(self, hs): """Helper method to start a replication connection to the remote server @@ -181,29 +181,40 @@ def get_currently_syncing_users(self): """ return self._presence_handler.get_currently_syncing_users() - def update_connection(self, connection): - """Called when a connection has been established (or lost with None). + def new_connection(self, connection): + """Called when we have a new connection. """ - self._connection = connection + self._connections.append(connection) - def finished_connecting(self): - """Called when we have successfully subscribed and caught up to all - streams we're interested in. - """ - logger.info("Finished connecting to server") - - # We don't reset the delay any earlier as otherwise if there is a - # problem during start up we'll end up tight looping connecting to the - # server. + # If we're using a ReplicationClientFactory then we reset the connection + # delay now. We don't reset the delay any earlier as otherwise if there + # is a problem during start up we'll end up tight looping connecting to + # the server. if self._factory: self._factory.resetDelay() + def lost_connection(self, connection): + """Called when a connection is closed/lost. + """ + try: + self._connections.remove(connection) + except ValueError: + pass + + def connected(self) -> bool: + """Do we have any replication connections open? + + Used to no-op if nothing is connected. + """ + return bool(self._connections) + def send_command(self, cmd: Command): """Send a command to master (when we get establish a connection if we don't have one already.) """ - if self._connection: - self._connection.send_command(cmd) + if self._connections: + for connection in self._connections: + connection.send_command(cmd) else: logger.warning("Dropping command as not connected: %r", cmd.NAME) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index f2a37f568e30..a412ae376fd9 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -508,8 +508,7 @@ def connectionMade(self): self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) # We've now finished connecting to so inform the client handler - self.handler.update_connection(self) - self.handler.finished_connecting() + self.handler.new_connection(self) async def handle_command(self, cmd: Command): """Handle a command we have received over the replication stream. @@ -552,7 +551,7 @@ def replicate(self): def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) - self.handler.update_connection(None) + self.handler.lost_connection(self) # The following simply registers metrics for the replication connections From cadb3f57dd8294bf3602a8ecfd580559b0f57339 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Mar 2020 14:25:09 +0100 Subject: [PATCH 02/16] Merge server command processing into ReplicationCommandHandler --- synapse/replication/tcp/handler.py | 109 ++++++++++++++++++- synapse/replication/tcp/protocol.py | 126 +++++----------------- synapse/replication/tcp/resource.py | 156 ++++------------------------ 3 files changed, 153 insertions(+), 238 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 3fe736bb772e..23cdc9917435 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -19,8 +19,10 @@ from prometheus_client import Counter +from synapse.metrics import LaterGauge from synapse.replication.tcp.client import ReplicationClientFactory from synapse.replication.tcp.commands import ( + ClearUserSyncsCommand, Command, FederationAckCommand, InvalidateCacheCommand, @@ -28,6 +30,7 @@ RdataCommand, RemoteServerUpCommand, RemovePusherCommand, + ReplicateCommand, SyncCommand, UserIpCommand, UserSyncCommand, @@ -42,6 +45,13 @@ inbound_rdata_count = Counter( "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"] ) +user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") +federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") +remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") +invalidate_cache_counter = Counter( + "synapse_replication_tcp_resource_invalidate_cache", "" +) +user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") class ReplicationCommandHandler: @@ -52,6 +62,8 @@ class ReplicationCommandHandler: def __init__(self, hs): self._replication_data_handler = hs.get_replication_data_handler() self._presence_handler = hs.get_presence_handler() + self._store = hs.get_datastore() + self._notifier = hs.get_notifier() # Set of streams that we've caught up with. self._streams_connected = set() # type: Set[str] @@ -70,7 +82,25 @@ def __init__(self, hs): self._factory = None # type: Optional[ReplicationClientFactory] # The currently connected connections. - self._connections = [] + self._connections = [] # type: List[Any] + + LaterGauge( + "synapse_replication_tcp_resource_total_connections", + "", + [], + lambda: len(self._connections), + ) + + self._is_master = hs.config.worker_app is None + + self._federation_sender = None + if self._is_master and not hs.config.send_federation: + self._federation_sender = hs.get_federation_sender() + + self._server_notices_sender = None + if self._is_master: + self._server_notices_sender = hs.get_server_notices_sender() + self._notifier.add_remote_server_up_callback(self.send_remote_server_up) def start_replication(self, hs): """Helper method to start a replication connection to the remote server @@ -82,6 +112,73 @@ def start_replication(self, hs): port = hs.config.worker_replication_port hs.get_reactor().connectTCP(host, port, self._factory) + async def on_REPLICATE(self, cmd: ReplicateCommand): + # We only want to announce positions by the writer of the streams. + # Currently this is just the master process. + if not self._is_master: + return + + if not self._connections: + raise Exception("Not connected") + + for stream_name, stream in self._streams.items(): + current_token = stream.current_token() + self.send_command(PositionCommand(stream_name, current_token)) + + async def on_USER_SYNC(self, cmd: UserSyncCommand): + user_sync_counter.inc() + + if self._is_master: + await self._presence_handler.update_external_syncs_row( + cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms + ) + + async def on_CLEAR_USER_SYNC(self, cmd: ClearUserSyncsCommand): + if self._is_master: + await self._presence_handler.update_external_syncs_clear(cmd.instance_id) + + async def on_FEDERATION_ACK(self, cmd: FederationAckCommand): + federation_ack_counter.inc() + + if self._federation_sender: + self._federation_sender.federation_ack(cmd.token) + + async def on_REMOVE_PUSHER(self, cmd: RemovePusherCommand): + remove_pusher_counter.inc() + + if self._is_master: + await self._store.delete_pusher_by_app_id_pushkey_user_id( + app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id + ) + + self._notifier.on_new_replication_data() + + async def on_INVALIDATE_CACHE(self, cmd: InvalidateCacheCommand): + invalidate_cache_counter.inc() + + if self._is_master: + # We invalidate the cache locally, but then also stream that to other + # workers. + await self._store.invalidate_cache_and_stream( + cmd.cache_func, tuple(cmd.keys) + ) + + async def on_USER_IP(self, cmd: UserIpCommand): + user_ip_cache_counter.inc() + + if self._is_master: + await self._store.insert_client_ip( + cmd.user_id, + cmd.access_token, + cmd.ip, + cmd.user_agent, + cmd.device_id, + cmd.last_seen, + ) + + if self._server_notices_sender: + await self._server_notices_sender.on_user_ip(cmd.user_id) + async def on_RDATA(self, cmd: RdataCommand): stream_name = cmd.stream_name inbound_rdata_count.labels(stream_name).inc() @@ -174,6 +271,9 @@ async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): """"Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) + if self._is_master: + self._notifier.notify_remote_server_up(cmd.data) + def get_currently_syncing_users(self): """Get the list of currently syncing users (if any). This is called when a connection has been established and we need to send the @@ -261,3 +361,10 @@ def send_user_ip( def send_remote_server_up(self, server: str): self.send_command(RemoteServerUpCommand(server)) + + def stream_update(self, stream_name: str, token: str, data: Any): + """Called when a new update is available to stream to clients. + + We need to check if the client is interested in the stream or not + """ + self.send_command(RdataCommand(stream_name, token, data)) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a412ae376fd9..995b47a1eddf 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -69,12 +69,8 @@ ErrorCommand, NameCommand, PingCommand, - PositionCommand, - RdataCommand, - RemoteServerUpCommand, ReplicateCommand, ServerCommand, - SyncCommand, UserSyncCommand, ) from synapse.types import Collection @@ -134,8 +130,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): max_line_buffer = 10000 - def __init__(self, clock): + def __init__(self, clock, handler): self.clock = clock + self.handler = handler self.last_received_command = self.clock.time_msec() self.last_sent_command = 0 @@ -175,6 +172,8 @@ def connectionMade(self): # can time us out. self.send_command(PingCommand(self.clock.time_msec())) + self.handler.new_connection(self) + def send_ping(self): """Periodically sends a ping and checks if we should close the connection due to the other side timing out. @@ -248,8 +247,23 @@ async def handle_command(self, cmd: Command): Args: cmd: received command """ - handler = getattr(self, "on_%s" % (cmd.NAME,)) - await handler(cmd) + handled = False + + # First call any command handlers on this instance. These are for TCP + # specific handling. + cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) + if cmd_func: + await cmd_func(cmd) + handled = True + + # Then call out to the handler. + cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) + if cmd_func: + await cmd_func(cmd) + handled = True + + if not handled: + logger.warning("Unhandled command: %r", cmd) def close(self): logger.warning("[%s] Closing connection", self.id()) @@ -378,6 +392,8 @@ def on_connection_closed(self): self.state = ConnectionStates.CLOSED self.pending_commands = [] + self.handler.lost_connection(self) + if self.transport: self.transport.unregisterProducer() @@ -404,74 +420,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS - def __init__(self, server_name, clock, streamer): - BaseReplicationStreamProtocol.__init__(self, clock) # Old style class + def __init__(self, server_name, clock, handler): + BaseReplicationStreamProtocol.__init__(self, clock, handler) # Old style class self.server_name = server_name - self.streamer = streamer def connectionMade(self): self.send_command(ServerCommand(self.server_name)) BaseReplicationStreamProtocol.connectionMade(self) - self.streamer.new_connection(self) async def on_NAME(self, cmd): logger.info("[%s] Renamed to %r", self.id(), cmd.data) self.name = cmd.data - async def on_USER_SYNC(self, cmd): - await self.streamer.on_user_sync( - cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms - ) - - async def on_CLEAR_USER_SYNC(self, cmd): - await self.streamer.on_clear_user_syncs(cmd.instance_id) - - async def on_REPLICATE(self, cmd): - # Subscribe to all streams we're publishing to. - for stream_name in self.streamer.streams_by_name: - current_token = self.streamer.get_stream_token(stream_name) - self.send_command(PositionCommand(stream_name, current_token)) - - async def on_FEDERATION_ACK(self, cmd): - self.streamer.federation_ack(cmd.token) - - async def on_REMOVE_PUSHER(self, cmd): - await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) - - async def on_INVALIDATE_CACHE(self, cmd): - await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) - - async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): - self.streamer.on_remote_server_up(cmd.data) - - async def on_USER_IP(self, cmd): - self.streamer.on_user_ip( - cmd.user_id, - cmd.access_token, - cmd.ip, - cmd.user_agent, - cmd.device_id, - cmd.last_seen, - ) - - def stream_update(self, stream_name, token, data): - """Called when a new update is available to stream to clients. - - We need to check if the client is interested in the stream or not - """ - self.send_command(RdataCommand(stream_name, token, data)) - - def send_sync(self, data): - self.send_command(SyncCommand(data)) - - def send_remote_server_up(self, server: str): - self.send_command(RemoteServerUpCommand(server)) - - def on_connection_closed(self): - BaseReplicationStreamProtocol.on_connection_closed(self) - self.streamer.lost_connection(self) - class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS @@ -485,13 +446,12 @@ def __init__( clock: Clock, command_handler: "ReplicationCommandHandler", ): - BaseReplicationStreamProtocol.__init__(self, clock) + BaseReplicationStreamProtocol.__init__(self, clock, command_handler) self.instance_id = hs.get_instance_id() self.client_name = client_name self.server_name = server_name - self.handler = command_handler def connectionMade(self): self.send_command(NameCommand(self.client_name)) @@ -507,36 +467,6 @@ def connectionMade(self): for user_id in currently_syncing: self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) - # We've now finished connecting to so inform the client handler - self.handler.new_connection(self) - - async def handle_command(self, cmd: Command): - """Handle a command we have received over the replication stream. - - Delegates to `command_handler.on_`, which must return an - awaitable. - - Args: - cmd: received command - """ - handled = False - - # First call any command handlers on this instance. These are for TCP - # specific handling. - cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(cmd) - handled = True - - # Then call out to the handler. - cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(cmd) - handled = True - - if not handled: - logger.warning("Unhandled command: %r", cmd) - async def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) @@ -549,10 +479,6 @@ def replicate(self): self.send_command(ReplicateCommand()) - def on_connection_closed(self): - BaseReplicationStreamProtocol.on_connection_closed(self) - self.handler.lost_connection(self) - # The following simply registers metrics for the replication connections diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 30021ee309df..cc0322258748 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,7 @@ import logging import random -from typing import Any, Dict, List +from typing import Dict from six import itervalues @@ -25,24 +25,14 @@ from twisted.internet.protocol import Factory -from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.metrics import Measure, measure_func - -from .protocol import ServerReplicationStreamProtocol -from .streams import STREAMS_MAP, Stream -from .streams.federation import FederationStream +from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import STREAMS_MAP, FederationStream, Stream +from synapse.util.metrics import Measure stream_updates_counter = Counter( "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"] ) -user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") -federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") -remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") -invalidate_cache_counter = Counter( - "synapse_replication_tcp_resource_invalidate_cache", "" -) -user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) @@ -52,13 +42,18 @@ class ReplicationStreamProtocolFactory(Factory): """ def __init__(self, hs): - self.streamer = hs.get_replication_streamer() + self.handler = hs.get_tcp_replication() self.clock = hs.get_clock() self.server_name = hs.config.server_name + self.hs = hs + + # Ensure the replication streamer is started if we register a + # replication server endpoint. + hs.get_replication_streamer() def buildProtocol(self, addr): return ServerReplicationStreamProtocol( - self.server_name, self.clock, self.streamer + self.server_name, self.clock, self.handler ) @@ -78,16 +73,6 @@ def __init__(self, hs): self._replication_torture_level = hs.config.replication_torture_level - # Current connections. - self.connections = [] # type: List[ServerReplicationStreamProtocol] - - LaterGauge( - "synapse_replication_tcp_resource_total_connections", - "", - [], - lambda: len(self.connections), - ) - # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been # disabled on the master. @@ -104,18 +89,12 @@ def __init__(self, hs): self.federation_sender = hs.get_federation_sender() self.notifier.add_replication_callback(self.on_notifier_poke) - self.notifier.add_remote_server_up_callback(self.send_remote_server_up) # Keeps track of whether we are currently checking for updates self.is_looping = False self.pending_updates = False - hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown) - - def on_shutdown(self): - # close all connections on shutdown - for conn in self.connections: - conn.send_error("server shutting down") + self.client = hs.get_tcp_replication() def get_streams(self) -> Dict[str, Stream]: """Get a mapp from stream name to stream instance. @@ -129,7 +108,7 @@ def on_notifier_poke(self): This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed """ - if not self.connections: + if not self.client.connected(): # Don't bother if nothing is listening. We still need to advance # the stream tokens otherwise they'll fall beihind forever for stream in self.streams: @@ -186,9 +165,7 @@ async def _run_notifier_loop(self): raise logger.debug( - "Sending %d updates to %d connections", - len(updates), - len(self.connections), + "Sending %d updates", len(updates), ) if updates: @@ -204,112 +181,17 @@ async def _run_notifier_loop(self): # token. See RdataCommand for more details. batched_updates = _batch_updates(updates) - for conn in self.connections: - for token, row in batched_updates: - try: - conn.stream_update(stream.NAME, token, row) - except Exception: - logger.exception("Failed to replicate") + for token, row in batched_updates: + try: + self.client.stream_update(stream.NAME, token, row) + except Exception: + logger.exception("Failed to replicate") logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False self.is_looping = False - def get_stream_token(self, stream_name): - """For a given stream get all updates since token. This is called when - a client first subscribes to a stream. - """ - stream = self.streams_by_name.get(stream_name, None) - if not stream: - raise Exception("unknown stream %s", stream_name) - - return stream.current_token() - - @measure_func("repl.federation_ack") - def federation_ack(self, token): - """We've received an ack for federation stream from a client. - """ - federation_ack_counter.inc() - if self.federation_sender: - self.federation_sender.federation_ack(token) - - @measure_func("repl.on_user_sync") - async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms): - """A client has started/stopped syncing on a worker. - """ - user_sync_counter.inc() - await self.presence_handler.update_external_syncs_row( - instance_id, user_id, is_syncing, last_sync_ms - ) - - async def on_clear_user_syncs(self, instance_id): - """A replication client wants us to drop all their UserSync data. - """ - await self.presence_handler.update_external_syncs_clear(instance_id) - - @measure_func("repl.on_remove_pusher") - async def on_remove_pusher(self, app_id, push_key, user_id): - """A client has asked us to remove a pusher - """ - remove_pusher_counter.inc() - await self.store.delete_pusher_by_app_id_pushkey_user_id( - app_id=app_id, pushkey=push_key, user_id=user_id - ) - - self.notifier.on_new_replication_data() - - @measure_func("repl.on_invalidate_cache") - async def on_invalidate_cache(self, cache_func: str, keys: List[Any]): - """The client has asked us to invalidate a cache - """ - invalidate_cache_counter.inc() - - # We invalidate the cache locally, but then also stream that to other - # workers. - await self.store.invalidate_cache_and_stream(cache_func, tuple(keys)) - - @measure_func("repl.on_user_ip") - async def on_user_ip( - self, user_id, access_token, ip, user_agent, device_id, last_seen - ): - """The client saw a user request - """ - user_ip_cache_counter.inc() - await self.store.insert_client_ip( - user_id, access_token, ip, user_agent, device_id, last_seen - ) - await self._server_notices_sender.on_user_ip(user_id) - - @measure_func("repl.on_remote_server_up") - def on_remote_server_up(self, server: str): - self.notifier.notify_remote_server_up(server) - - def send_remote_server_up(self, server: str): - for conn in self.connections: - conn.send_remote_server_up(server) - - def send_sync_to_all_connections(self, data): - """Sends a SYNC command to all clients. - - Used in tests. - """ - for conn in self.connections: - conn.send_sync(data) - - def new_connection(self, connection): - """A new client connection has been established - """ - self.connections.append(connection) - - def lost_connection(self, connection): - """A client connection has been lost - """ - try: - self.connections.remove(connection) - except ValueError: - pass - def _batch_updates(updates): """Takes a list of updates of form [(token, row)] and sets the token to From dd93e91b243d0f1d3188f845db23f2484f756411 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 10:07:27 +0100 Subject: [PATCH 03/16] Add typing --- synapse/replication/tcp/protocol.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 995b47a1eddf..db78b4f4bb04 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -130,7 +130,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): max_line_buffer = 10000 - def __init__(self, clock, handler): + def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"): self.clock = clock self.handler = handler @@ -420,7 +420,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS - def __init__(self, server_name, clock, handler): + def __init__( + self, server_name: str, clock: Clock, handler: "ReplicationCommandHandler" + ): BaseReplicationStreamProtocol.__init__(self, clock, handler) # Old style class self.server_name = server_name From 1d00a749b742a8a9f12d903595ce48dd569a42a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 12:57:51 +0100 Subject: [PATCH 04/16] Newsfile --- changelog.d/7187.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7187.misc diff --git a/changelog.d/7187.misc b/changelog.d/7187.misc new file mode 100644 index 000000000000..60d68ae87704 --- /dev/null +++ b/changelog.d/7187.misc @@ -0,0 +1 @@ +Move server command handling out of TCP protocol. From 525a47dc2847f041d68f812bdbdc1e43e8f4188c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:34:33 +0100 Subject: [PATCH 05/16] Improve docstrings in handler --- synapse/replication/tcp/handler.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 23cdc9917435..66237f5e528e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -286,10 +286,15 @@ def new_connection(self, connection): """ self._connections.append(connection) - # If we're using a ReplicationClientFactory then we reset the connection - # delay now. We don't reset the delay any earlier as otherwise if there - # is a problem during start up we'll end up tight looping connecting to - # the server. + # If we are connected to replication as a client (rather than a server) + # we need to reset the reconnection delay on the client factory (which + # is used to do exponential back off when the connection drops). + # + # Ideally we would reset the delay when we've "fully established" the + # connection (for some definition thereof) to stop us from tightlooping + # on reconnection if something fails after this point and we drop the + # connection. Unfortunately, we don't really have a better definition of + # "fully established" than the connection being established. if self._factory: self._factory.resetDelay() @@ -304,13 +309,12 @@ def lost_connection(self, connection): def connected(self) -> bool: """Do we have any replication connections open? - Used to no-op if nothing is connected. + Is used by e.g. `ReplicationStreamer` to no-op if nothing is connected. """ return bool(self._connections) def send_command(self, cmd: Command): - """Send a command to master (when we get establish a connection if we - don't have one already.) + """Send a command to all connected connections. """ if self._connections: for connection in self._connections: From 8a6a6802c0e47afdedb3610e677814baa50742ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:35:13 +0100 Subject: [PATCH 06/16] Remove unnecessary self.hs --- synapse/replication/tcp/resource.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index cc0322258748..44fb3551c105 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -45,7 +45,6 @@ def __init__(self, hs): self.handler = hs.get_tcp_replication() self.clock = hs.get_clock() self.server_name = hs.config.server_name - self.hs = hs # Ensure the replication streamer is started if we register a # replication server endpoint. From ad2a80a742d131a085e92f49dd6e7017773b75a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:41:10 +0100 Subject: [PATCH 07/16] Don't error if we get a REPLICATE command without a connection It's probably just a race. --- synapse/replication/tcp/handler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 66237f5e528e..d895626802ac 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -118,9 +118,6 @@ async def on_REPLICATE(self, cmd: ReplicateCommand): if not self._is_master: return - if not self._connections: - raise Exception("Not connected") - for stream_name, stream in self._streams.items(): current_token = stream.current_token() self.send_command(PositionCommand(stream_name, current_token)) From 8d81fd25c482227c952164ee7042e63644be38e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:41:49 +0100 Subject: [PATCH 08/16] Handle errors when sending commands to connections --- synapse/replication/tcp/handler.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d895626802ac..494c5ddba6c1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -315,7 +315,17 @@ def send_command(self, cmd: Command): """ if self._connections: for connection in self._connections: - connection.send_command(cmd) + try: + connection.send_command(cmd) + except Exception: + # We probably want to catch some types of exceptions here + # and log them as warnings (e.g. connection gone), but I + # can't find what those exception types they would be. + logger.exception( + "Failed to write command %s to connection %s", + cmd.NAME, + connection, + ) else: logger.warning("Dropping command as not connected: %r", cmd.NAME) From 984c959f38d3aa70fb642bde3b22f93a73e52713 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:43:23 +0100 Subject: [PATCH 09/16] Fixup name 'handler' --- synapse/replication/tcp/resource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 44fb3551c105..4764f3b12133 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -42,7 +42,7 @@ class ReplicationStreamProtocolFactory(Factory): """ def __init__(self, hs): - self.handler = hs.get_tcp_replication() + self.command_handler = hs.get_tcp_replication() self.clock = hs.get_clock() self.server_name = hs.config.server_name @@ -52,7 +52,7 @@ def __init__(self, hs): def buildProtocol(self, addr): return ServerReplicationStreamProtocol( - self.server_name, self.clock, self.handler + self.server_name, self.clock, self.command_handler ) From 9cc569857d1b94e6244c69e5e55a166f5899801f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:44:01 +0100 Subject: [PATCH 10/16] Fixup old style class constructors --- synapse/replication/tcp/protocol.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index db78b4f4bb04..d0b381aef332 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -423,13 +423,13 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def __init__( self, server_name: str, clock: Clock, handler: "ReplicationCommandHandler" ): - BaseReplicationStreamProtocol.__init__(self, clock, handler) # Old style class + super().__init__(clock, handler) self.server_name = server_name def connectionMade(self): self.send_command(ServerCommand(self.server_name)) - BaseReplicationStreamProtocol.connectionMade(self) + super().connectionMade() async def on_NAME(self, cmd): logger.info("[%s] Renamed to %r", self.id(), cmd.data) @@ -448,7 +448,7 @@ def __init__( clock: Clock, command_handler: "ReplicationCommandHandler", ): - BaseReplicationStreamProtocol.__init__(self, clock, command_handler) + super().__init__(clock, command_handler) self.instance_id = hs.get_instance_id() @@ -457,7 +457,7 @@ def __init__( def connectionMade(self): self.send_command(NameCommand(self.client_name)) - BaseReplicationStreamProtocol.connectionMade(self) + super().connectionMade() # Once we've connected subscribe to the necessary streams self.replicate() From 3653d0308c86bac7df1f95e7841b45639679a940 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:48:10 +0100 Subject: [PATCH 11/16] Update handle_command docstring --- synapse/replication/tcp/protocol.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d0b381aef332..e661982d23cf 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -242,7 +242,10 @@ def lineReceived(self, line): async def handle_command(self, cmd: Command): """Handle a command we have received over the replication stream. - By default delegates to on_, which should return an awaitable. + First calls `self.on_` if it exists, then calls + `self.command_handler.on_` if it exists. This allows for + protocol level handling of commands (e.g. PINGs), before delegating to + the handler. Args: cmd: received command From ad2beda0de678a7420ad87b41f55211335f29eb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:48:46 +0100 Subject: [PATCH 12/16] Rename handler -> command_handler --- synapse/replication/tcp/protocol.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index e661982d23cf..d74fde7e0bb3 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -132,7 +132,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"): self.clock = clock - self.handler = handler + self.command_handler = handler self.last_received_command = self.clock.time_msec() self.last_sent_command = 0 @@ -172,7 +172,7 @@ def connectionMade(self): # can time us out. self.send_command(PingCommand(self.clock.time_msec())) - self.handler.new_connection(self) + self.command_handler.new_connection(self) def send_ping(self): """Periodically sends a ping and checks if we should close the connection @@ -260,7 +260,7 @@ async def handle_command(self, cmd: Command): handled = True # Then call out to the handler. - cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) + cmd_func = getattr(self.command_handler, "on_%s" % (cmd.NAME,), None) if cmd_func: await cmd_func(cmd) handled = True @@ -395,7 +395,7 @@ def on_connection_closed(self): self.state = ConnectionStates.CLOSED self.pending_commands = [] - self.handler.lost_connection(self) + self.command_handler.lost_connection(self) if self.transport: self.transport.unregisterProducer() @@ -467,7 +467,7 @@ def connectionMade(self): # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) - currently_syncing = self.handler.get_currently_syncing_users() + currently_syncing = self.command_handler.get_currently_syncing_users() now = self.clock.time_msec() for user_id in currently_syncing: self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) From 27e4d2bdd580436de893140443fa4b2382036cfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 14:55:20 +0100 Subject: [PATCH 13/16] Add an AbstractConnection interface class --- synapse/replication/tcp/handler.py | 7 ++++--- synapse/replication/tcp/protocol.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 494c5ddba6c1..ce14c6f2c4a2 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -35,6 +35,7 @@ UserIpCommand, UserSyncCommand, ) +from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.streams import STREAMS_MAP, Stream from synapse.util.async_helpers import Linearizer @@ -82,7 +83,7 @@ def __init__(self, hs): self._factory = None # type: Optional[ReplicationClientFactory] # The currently connected connections. - self._connections = [] # type: List[Any] + self._connections = [] # type: List[AbstractConnection] LaterGauge( "synapse_replication_tcp_resource_total_connections", @@ -278,7 +279,7 @@ def get_currently_syncing_users(self): """ return self._presence_handler.get_currently_syncing_users() - def new_connection(self, connection): + def new_connection(self, connection: AbstractConnection): """Called when we have a new connection. """ self._connections.append(connection) @@ -295,7 +296,7 @@ def new_connection(self, connection): if self._factory: self._factory.resetDelay() - def lost_connection(self, connection): + def lost_connection(self, connection: AbstractConnection): """Called when a connection is closed/lost. """ try: diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d74fde7e0bb3..bb12d6a14b6f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -46,6 +46,7 @@ > ERROR server stopping * connection closed by server * """ +import abc import fcntl import logging import struct @@ -485,6 +486,22 @@ def replicate(self): self.send_command(ReplicateCommand()) +class AbstractConnection(abc.ABC): + """An interface for replication connections. + """ + + @abc.abstractmethod + def send_command(self, cmd: Command): + """Send the command down the connection + """ + pass + + +# This tells python that `BaseReplicationStreamProtocol` implements the +# interface. +AbstractConnection.register(BaseReplicationStreamProtocol) + + # The following simply registers metrics for the replication connections pending_commands = LaterGauge( From 2190027cf99c44530258bb6f7d96243e4691caed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 15:37:25 +0100 Subject: [PATCH 14/16] Move UserSyncCommand handling to CommandHandler --- synapse/replication/tcp/handler.py | 11 +++++++++++ synapse/replication/tcp/protocol.py | 10 ---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ce14c6f2c4a2..8ec011969737 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -65,6 +65,8 @@ def __init__(self, hs): self._presence_handler = hs.get_presence_handler() self._store = hs.get_datastore() self._notifier = hs.get_notifier() + self._clock = hs.get_clock() + self._instance_id = hs.get_instance_id() # Set of streams that we've caught up with. self._streams_connected = set() # type: Set[str] @@ -296,6 +298,15 @@ def new_connection(self, connection: AbstractConnection): if self._factory: self._factory.resetDelay() + # Tell the server if we have any users currently syncing (should only + # happen on synchrotrons) + currently_syncing = self.get_currently_syncing_users() + now = self._clock.time_msec() + for user_id in currently_syncing: + connection.send_command( + UserSyncCommand(self._instance_id, user_id, True, now) + ) + def lost_connection(self, connection: AbstractConnection): """Called when a connection is closed/lost. """ diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index bb12d6a14b6f..eab796634e04 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -72,7 +72,6 @@ PingCommand, ReplicateCommand, ServerCommand, - UserSyncCommand, ) from synapse.types import Collection from synapse.util import Clock @@ -454,8 +453,6 @@ def __init__( ): super().__init__(clock, command_handler) - self.instance_id = hs.get_instance_id() - self.client_name = client_name self.server_name = server_name @@ -466,13 +463,6 @@ def connectionMade(self): # Once we've connected subscribe to the necessary streams self.replicate() - # Tell the server if we have any users currently syncing (should only - # happen on synchrotrons) - currently_syncing = self.command_handler.get_currently_syncing_users() - now = self.clock.time_msec() - for user_id in currently_syncing: - self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) - async def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) From 7b2af21b93042d09ab59ec365aee8e9eeef2791f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 15:38:18 +0100 Subject: [PATCH 15/16] s/self.client/self.command_handler/ --- synapse/replication/tcp/resource.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 4764f3b12133..5b0eaf4d2b70 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -93,7 +93,7 @@ def __init__(self, hs): self.is_looping = False self.pending_updates = False - self.client = hs.get_tcp_replication() + self.command_handler = hs.get_tcp_replication() def get_streams(self) -> Dict[str, Stream]: """Get a mapp from stream name to stream instance. @@ -107,7 +107,7 @@ def on_notifier_poke(self): This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed """ - if not self.client.connected(): + if not self.command_handler.connected(): # Don't bother if nothing is listening. We still need to advance # the stream tokens otherwise they'll fall beihind forever for stream in self.streams: @@ -182,7 +182,9 @@ async def _run_notifier_loop(self): for token, row in batched_updates: try: - self.client.stream_update(stream.NAME, token, row) + self.command_handler.stream_update( + stream.NAME, token, row + ) except Exception: logger.exception("Failed to replicate") From ffdf38c3e8fef1a2f804be16ffdbdbf337950c17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 18:36:32 +0100 Subject: [PATCH 16/16] Moar comments --- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/resource.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index eab796634e04..9aabb9c586c9 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -114,7 +114,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): are only sent by the server. On receiving a new command it calls `on_` with the parsed - command. + command before delegating to `ReplicationCommandHandler.on_`. It also sends `PING` periodically, and correctly times out remote connections (if they send a `PING` command) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 5b0eaf4d2b70..b2d6baa2a231 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -46,8 +46,14 @@ def __init__(self, hs): self.clock = hs.get_clock() self.server_name = hs.config.server_name - # Ensure the replication streamer is started if we register a - # replication server endpoint. + # If we've created a `ReplicationStreamProtocolFactory` then we're + # almost certainly registering a replication listener, so let's ensure + # that we've started a `ReplicationStreamer` instance to actually push + # data. + # + # (This is a bit of a weird place to do this, but the alternatives such + # as putting this in `HomeServer.setup()`, requires either passing the + # listener config again or always starting a `ReplicationStreamer`.) hs.get_replication_streamer() def buildProtocol(self, addr):