Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge server command processing into ReplicationCommandHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Mar 31, 2020
1 parent 3c13561 commit 42af522
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 261 deletions.
124 changes: 123 additions & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@

from prometheus_client import Counter

from synapse.metrics import LaterGauge
from synapse.replication.tcp.client import ReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
FederationAckCommand,
InvalidateCacheCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
RemovePusherCommand,
ReplicateCommand,
SyncCommand,
UserIpCommand,
UserSyncCommand,
Expand All @@ -44,6 +47,13 @@
inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
invalidate_cache_counter = Counter(
"synapse_replication_tcp_resource_invalidate_cache", ""
)
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")


class ReplicationCommandHandler:
Expand All @@ -52,6 +62,8 @@ class ReplicationCommandHandler:

def __init__(self, hs):
self.replication_data_handler = hs.get_replication_data_handler()
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()

# Set of streams that we're currently catching up with.
Expand All @@ -71,7 +83,41 @@ def __init__(self, hs):
self.factory = None # type: Optional[ReplicationClientFactory]

# The currently connected connections.
self.connections = []
self.connections = [] # type: List[Any]

LaterGauge(
"synapse_replication_tcp_resource_total_connections",
"",
[],
lambda: len(self.connections),
)

LaterGauge(
"synapse_replication_tcp_resource_connections_per_stream",
"",
["stream_name"],
lambda: {
(stream_name,): len(
[
conn
for conn in self.connections
if stream_name in conn.replication_streams
]
)
for stream_name in self.streams
},
)

self.is_master = hs.config.worker_app is None

self.federation_sender = None
if self.is_master and not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()

self._server_notices_sender = None
if self.is_master:
self._server_notices_sender = hs.get_server_notices_sender()
self.notifier.add_remote_server_up_callback(self.send_remote_server_up)

def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
Expand All @@ -83,6 +129,73 @@ def start_replication(self, hs):
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self.factory)

async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
# Currently this is just the master process.
if not self.is_master:
return

if not self.connections:
raise Exception("Not connected")

for stream_name, stream in self.streams.items():
current_token = stream.current_token()
self.send_command(PositionCommand(stream_name, current_token))

async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()

if self.is_master:
await self.presence_handler.update_external_syncs_row(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)

async def on_CLEAR_USER_SYNC(self, cmd: ClearUserSyncsCommand):
if self.is_master:
await self.presence_handler.update_external_syncs_clear(cmd.instance_id)

async def on_FEDERATION_ACK(self, cmd: FederationAckCommand):
federation_ack_counter.inc()

if self.federation_sender:
self.federation_sender.federation_ack(cmd.token)

async def on_REMOVE_PUSHER(self, cmd: RemovePusherCommand):
remove_pusher_counter.inc()

if self.is_master:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
)

self.notifier.on_new_replication_data()

async def on_INVALIDATE_CACHE(self, cmd: InvalidateCacheCommand):
invalidate_cache_counter.inc()

if self.is_master:
# We invalidate the cache locally, but then also stream that to other
# workers.
await self.store.invalidate_cache_and_stream(
cmd.cache_func, tuple(cmd.keys)
)

async def on_USER_IP(self, cmd: UserIpCommand):
user_ip_cache_counter.inc()

if self.is_master:
await self.store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

if self._server_notices_sender:
await self._server_notices_sender.on_user_ip(cmd.user_id)

async def on_RDATA(self, cmd: RdataCommand):
stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()
Expand Down Expand Up @@ -167,6 +280,8 @@ async def on_SYNC(self, cmd: SyncCommand):

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
""""Called when get a new REMOTE_SERVER_UP command."""
if self.is_master:
self.notifier.notify_remote_server_up(cmd.data)

def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
Expand Down Expand Up @@ -249,3 +364,10 @@ def send_user_ip(

def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))

def stream_update(self, stream_name: str, token: str, data: Any):
"""Called when a new update is available to stream to clients.
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
133 changes: 26 additions & 107 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,8 @@
ErrorCommand,
NameCommand,
PingCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
ReplicateCommand,
ServerCommand,
SyncCommand,
UserSyncCommand,
)
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.types import Collection
Expand Down Expand Up @@ -136,8 +131,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):

max_line_buffer = 10000

def __init__(self, clock):
def __init__(self, clock, handler):
self.clock = clock
self.handler = handler

self.last_received_command = self.clock.time_msec()
self.last_sent_command = 0
Expand Down Expand Up @@ -177,6 +173,8 @@ def connectionMade(self):
# can time us out.
self.send_command(PingCommand(self.clock.time_msec()))

self.handler.new_connection(self)

def send_ping(self):
"""Periodically sends a ping and checks if we should close the connection
due to the other side timing out.
Expand Down Expand Up @@ -250,8 +248,23 @@ async def handle_command(self, cmd: Command):
Args:
cmd: received command
"""
handler = getattr(self, "on_%s" % (cmd.NAME,))
await handler(cmd)
handled = False

# First call any command handlers on this instance. These are for TCP
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

if not handled:
logger.warning("Unhandled command: %r", cmd)

def close(self):
logger.warning("[%s] Closing connection", self.id())
Expand Down Expand Up @@ -380,6 +393,8 @@ def on_connection_closed(self):
self.state = ConnectionStates.CLOSED
self.pending_commands = []

self.handler.lost_connection(self)

if self.transport:
self.transport.unregisterProducer()

Expand All @@ -406,74 +421,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS

def __init__(self, server_name, clock, streamer):
BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
def __init__(self, server_name, clock, handler):
BaseReplicationStreamProtocol.__init__(self, clock, handler) # Old style class

self.server_name = server_name
self.streamer = streamer

def connectionMade(self):
self.send_command(ServerCommand(self.server_name))
BaseReplicationStreamProtocol.connectionMade(self)
self.streamer.new_connection(self)

async def on_NAME(self, cmd):
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
self.name = cmd.data

async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)

async def on_CLEAR_USER_SYNC(self, cmd):
await self.streamer.on_clear_user_syncs(cmd.instance_id)

async def on_REPLICATE(self, cmd):
# Subscribe to all streams we're publishing to.
for stream_name in self.streamer.streams_by_name:
current_token = self.streamer.get_stream_token(stream_name)
self.send_command(PositionCommand(stream_name, current_token))

async def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)

async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)

async def on_INVALIDATE_CACHE(self, cmd):
await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)

async def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

def stream_update(self, stream_name, token, data):
"""Called when a new update is available to stream to clients.
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))

def send_sync(self, data):
self.send_command(SyncCommand(data))

def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))

def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.streamer.lost_connection(self)


class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
Expand Down Expand Up @@ -552,13 +512,12 @@ def __init__(
clock: Clock,
command_handler,
):
BaseReplicationStreamProtocol.__init__(self, clock)
BaseReplicationStreamProtocol.__init__(self, clock, command_handler)

self.instance_id = hs.get_instance_id()

self.client_name = client_name
self.server_name = server_name
self.handler = command_handler

self.streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
Expand All @@ -578,42 +537,6 @@ def connectionMade(self):
# Once we've connected subscribe to the necessary streams
self.replicate()

# Tell the server if we have any users currently syncing (should only
# happen on synchrotrons)
currently_syncing = self.handler.get_currently_syncing_users()
now = self.clock.time_msec()
for user_id in currently_syncing:
self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))

# We've now finished connecting to so inform the client handler
self.handler.new_connection(self)

async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.
By default delegates to on_<COMMAND>, which should return an awaitable.
Args:
cmd: received command
"""
handled = False

# First call any command handlers on this instance. These are for TCP
# specific handling.
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

# Then call out to the handler.
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
if cmd_func:
await cmd_func(cmd)
handled = True

if not handled:
logger.warning("Unhandled command: %r", cmd)

async def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
Expand All @@ -626,10 +549,6 @@ def replicate(self):

self.send_command(ReplicateCommand())

def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.handler.lost_connection(self)


# The following simply registers metrics for the replication connections

Expand Down
Loading

0 comments on commit 42af522

Please sign in to comment.