Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7239 from matrix-org/rav/replication_cleanup
Browse files Browse the repository at this point in the history
Miscellaneous cleanups to replication code
  • Loading branch information
richvdh authored Apr 7, 2020
2 parents 1722b8a + bd2ea34 commit aedeedc
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/7329.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
49 changes: 24 additions & 25 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""

import abc
import logging
import platform
from typing import Tuple, Type
Expand All @@ -34,34 +34,29 @@
logger = logging.getLogger(__name__)


class Command(object):
class Command(metaclass=abc.ABCMeta):
"""The base command class.
All subclasses must set the NAME variable which equates to the name of the
command on the wire.
A full command line on the wire is constructed from `NAME + " " + to_line()`
The default implementation creates a command of form `<NAME> <data>`
"""

NAME = None # type: str

def __init__(self, data):
self.data = data

@classmethod
@abc.abstractmethod
def from_line(cls, line):
"""Deserialises a line from the wire into this command. `line` does not
include the command.
"""
return cls(line)

def to_line(self):
@abc.abstractmethod
def to_line(self) -> str:
"""Serialises the comamnd for the wire. Does not include the command
prefix.
"""
return self.data

def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""
Expand All @@ -70,7 +65,21 @@ def get_logcontext_id(self):
return self.NAME


class ServerCommand(Command):
class _SimpleCommand(Command):
"""An implementation of Command whose argument is just a 'data' string."""

def __init__(self, data):
self.data = data

@classmethod
def from_line(cls, line):
return cls(line)

def to_line(self) -> str:
return self.data


class ServerCommand(_SimpleCommand):
"""Sent by the server on new connection and includes the server_name.
Format::
Expand Down Expand Up @@ -155,22 +164,22 @@ def to_line(self):
return " ".join((self.stream_name, str(self.token)))


class ErrorCommand(Command):
class ErrorCommand(_SimpleCommand):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
"""

NAME = "ERROR"


class PingCommand(Command):
class PingCommand(_SimpleCommand):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""

NAME = "PING"


class NameCommand(Command):
class NameCommand(_SimpleCommand):
"""Sent by client to inform the server of the client's identity. The data
is the name
"""
Expand Down Expand Up @@ -289,14 +298,6 @@ def to_line(self):
return str(self.token)


class SyncCommand(Command):
"""Used for testing. The client protocol implementation allows waiting
on a SYNC command with a specified data.
"""

NAME = "SYNC"


class RemovePusherCommand(Command):
"""Sent by the client to request the master remove the given pusher.
Expand Down Expand Up @@ -395,7 +396,7 @@ def to_line(self):
)


class RemoteServerUpCommand(Command):
class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.
Expand All @@ -419,7 +420,6 @@ class RemoteServerUpCommand(Command):
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
Expand All @@ -437,7 +437,6 @@ class RemoteServerUpCommand(Command):
PositionCommand.NAME,
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
RemoteServerUpCommand.NAME,
)

Expand Down
4 changes: 0 additions & 4 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
RemoteServerUpCommand,
RemovePusherCommand,
ReplicateCommand,
SyncCommand,
UserIpCommand,
UserSyncCommand,
)
Expand Down Expand Up @@ -281,9 +280,6 @@ async def on_POSITION(self, cmd: PositionCommand):

self._streams_connected.add(cmd.stream_name)

async def on_SYNC(self, cmd: SyncCommand):
pass

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
""""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)
Expand Down
14 changes: 11 additions & 3 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,23 @@ def send_ping(self):
)
self.send_error("ping timeout")

def lineReceived(self, line):
def lineReceived(self, line: bytes):
"""Called when we've received a line
"""
if line.strip() == "":
# Ignore blank lines
return

line = line.decode("utf-8")
cmd_name, rest_of_line = line.split(" ", 1)
linestr = line.decode("utf-8")

# split at the first " ", handling one-word commands
idx = linestr.index(" ")
if idx >= 0:
cmd_name = linestr[:idx]
rest_of_line = linestr[idx + 1 :]
else:
cmd_name = linestr
rest_of_line = ""

if cmd_name not in self.VALID_INBOUND_COMMANDS:
logger.error("[%s] invalid command %s", self.id(), cmd_name)
Expand Down

0 comments on commit aedeedc

Please sign in to comment.