Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow ReplicationCommandHandler to have multiple connections
Browse files Browse the repository at this point in the history
This will allow the server replication component to use
ReplicationCommandHandler.
  • Loading branch information
erikjohnston committed Mar 31, 2020
1 parent 5104d16 commit 3c13561
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
39 changes: 22 additions & 17 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def __init__(self, hs):
# The factory used to create connections.
self.factory = None # type: Optional[ReplicationClientFactory]

# The current connection. None if we are currently (re)connecting
self.connection = None
# The currently connected connections.
self.connections = []

def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
Expand Down Expand Up @@ -175,29 +175,34 @@ def get_currently_syncing_users(self):
"""
return self.presence_handler.get_currently_syncing_users()

def update_connection(self, connection):
"""Called when a connection has been established (or lost with None).
"""
self.connection = connection

def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
logger.info("Finished connecting to server")
def new_connection(self, connection):
self.connections.append(connection)

# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
# If we're using a ReplicationClientFactory then we reset the connection
# delay now.
if self.factory:
self.factory.resetDelay()

def lost_connection(self, connection):
try:
self.connections.remove(connection)
except ValueError:
pass

def connected(self) -> bool:
"""Do we have any replication connections open?
Used to no-op if nothing is connected.
"""
return bool(self.connections)

def send_command(self, cmd: Command):
"""Send a command to master (when we get establish a connection if we
don't have one already.)
"""
if self.connection:
self.connection.send_command(cmd)
if self.connections:
for connection in self.connections:
connection.send_command(cmd)
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)

Expand Down
5 changes: 2 additions & 3 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,7 @@ def connectionMade(self):
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))

# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
self.handler.finished_connecting()
self.handler.new_connection(self)

async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.
Expand Down Expand Up @@ -629,7 +628,7 @@ def replicate(self):

def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.handler.update_connection(None)
self.handler.lost_connection(self)


# The following simply registers metrics for the replication connections
Expand Down

0 comments on commit 3c13561

Please sign in to comment.