From 3c135615d4e2024ff1bd9525eaf190b504a090cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Mar 2020 13:53:08 +0100 Subject: [PATCH] Allow ReplicationCommandHandler to have multiple connections This will allow the server replication component to use ReplicationCommandHandler. --- synapse/replication/tcp/handler.py | 39 ++++++++++++++++------------- synapse/replication/tcp/protocol.py | 5 ++-- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b3c33370a02b..19bd43bc6afb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -70,8 +70,8 @@ def __init__(self, hs): # The factory used to create connections. self.factory = None # type: Optional[ReplicationClientFactory] - # The current connection. None if we are currently (re)connecting - self.connection = None + # The currently connected connections. + self.connections = [] def start_replication(self, hs): """Helper method to start a replication connection to the remote server @@ -175,29 +175,34 @@ def get_currently_syncing_users(self): """ return self.presence_handler.get_currently_syncing_users() - def update_connection(self, connection): - """Called when a connection has been established (or lost with None). - """ - self.connection = connection - - def finished_connecting(self): - """Called when we have successfully subscribed and caught up to all - streams we're interested in. - """ - logger.info("Finished connecting to server") + def new_connection(self, connection): + self.connections.append(connection) - # We don't reset the delay any earlier as otherwise if there is a - # problem during start up we'll end up tight looping connecting to the - # server. + # If we're using a ReplicationClientFactory then we reset the connection + # delay now. if self.factory: self.factory.resetDelay() + def lost_connection(self, connection): + try: + self.connections.remove(connection) + except ValueError: + pass + + def connected(self) -> bool: + """Do we have any replication connections open? + + Used to no-op if nothing is connected. + """ + return bool(self.connections) + def send_command(self, cmd: Command): """Send a command to master (when we get establish a connection if we don't have one already.) """ - if self.connection: - self.connection.send_command(cmd) + if self.connections: + for connection in self.connections: + connection.send_command(cmd) else: logger.warning("Dropping command as not connected: %r", cmd.NAME) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 7dc2828ad13d..ccf24a62ca33 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -586,8 +586,7 @@ def connectionMade(self): self.send_command(UserSyncCommand(self.instance_id, user_id, True, now)) # We've now finished connecting to so inform the client handler - self.handler.update_connection(self) - self.handler.finished_connecting() + self.handler.new_connection(self) async def handle_command(self, cmd: Command): """Handle a command we have received over the replication stream. @@ -629,7 +628,7 @@ def replicate(self): def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) - self.handler.update_connection(None) + self.handler.lost_connection(self) # The following simply registers metrics for the replication connections