Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move client command handling out of TCP protocol #7185

Merged
merged 21 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import (
AccountDataStream,
Expand Down Expand Up @@ -603,7 +603,7 @@ def start_listening(self, listeners):
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

def build_tcp_replication(self):
def build_replication_data_handler(self):
return GenericWorkerReplicationHandler(self)

def build_presence_handler(self):
Expand All @@ -613,7 +613,7 @@ def build_typing_handler(self):
return GenericWorkerTyping(self)


class GenericWorkerReplicationHandler(ReplicationClientHandler):
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())

Expand Down Expand Up @@ -644,9 +644,6 @@ def get_streams_to_replicate(self):
args.update(self.send_handler.stream_positions())
return args

def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()

async def process_and_notify(self, stream_name, token, rows):
try:
if self.send_handler:
Expand Down
67 changes: 53 additions & 14 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import (
AbstractReplicationClientHandler,
Expand All @@ -37,6 +36,10 @@
UserSyncCommand,
)

MYPY = False
if MYPY:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -84,8 +87,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
By default proxies incoming replication data to the SlaveStore.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, store: BaseSlavedStore):
self.store = store
def __init__(self, hs: "HomeServer"):
self.presence_handler = hs.get_presence_handler()
self.data_handler = hs.get_replication_data_handler()

# The current connection. None if we are currently (re)connecting
self.connection = None
Expand Down Expand Up @@ -125,15 +129,15 @@ async def on_rdata(self, stream_name, token, rows):
Stream.parse_row.
"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
await self.data_handler.on_rdata(stream_name, token, rows)

async def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
the slave store.

Can be overriden in subclasses to handle more.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
self.store.process_replication_rows(stream_name, token, [])
await self.data_handler.on_position(stream_name, token)

def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting
Expand All @@ -156,22 +160,15 @@ def get_streams_to_replicate(self) -> Dict[str, int]:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
args = self.store.stream_positions()
user_account_data = args.pop("user_account_data", None)
room_account_data = args.pop("room_account_data", None)
if user_account_data:
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data

return args
return self.data_handler.get_streams_to_replicate()

def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users. (Overriden by the synchrotron's only)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
return []
return self.presence_handler.get_currently_syncing_users()

def send_command(self, cmd):
"""Send a command to master (when we get establish a connection if we
Expand Down Expand Up @@ -245,3 +242,45 @@ def finished_connecting(self):
# server.
if self.factory:
self.factory.resetDelay()


class ReplicationDataHandler:
"""A replication data handler that calls slave data stores.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you try to describe what a replication data handler is supposed to do? I assume it receives data from some other class and does stuff with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW currently this is subclassed in generic_worker module but the intention is to fold all of the handling there into this class to remove the need to subclass.

"""

def __init__(self, store: BaseSlavedStore):
self.store = store

async def on_rdata(self, stream_name: str, token: int, rows: list):
"""Called to handle a batch of replication data with a given stream token.

By default this just pokes the slave store. Can be overridden in subclasses to
handle more.

Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
"""
self.store.process_replication_rows(stream_name, token, rows)

def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to
subscribe to streams.

Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
args = self.store.stream_positions()
user_account_data = args.pop("user_account_data", None)
room_account_data = args.pop("room_account_data", None)
if user_account_data:
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data
return args

async def on_position(self, stream_name: str, token: int):
self.store.process_replication_rows(stream_name, token, [])
10 changes: 9 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
from synapse.replication.tcp.client import (
ReplicationClientHandler,
ReplicationDataHandler,
)
from synapse.replication.tcp.resource import ReplicationStreamer
from synapse.rest.media.v1.media_repository import (
MediaRepository,
Expand Down Expand Up @@ -206,6 +210,7 @@ def build_DEPENDENCY(self)
"password_policy_handler",
"storage",
"replication_streamer",
"replication_data_handler",
]

REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
Expand Down Expand Up @@ -468,7 +473,7 @@ def build_read_marker_handler(self):
return ReadMarkerHandler(self)

def build_tcp_replication(self):
raise NotImplementedError()
return ReplicationClientHandler(self)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

def build_action_generator(self):
return ActionGenerator(self)
Expand Down Expand Up @@ -562,6 +567,9 @@ def build_storage(self) -> Storage:
def build_replication_streamer(self) -> ReplicationStreamer:
return ReplicationStreamer(self)

def build_replication_data_handler(self):
return ReplicationDataHandler(self.get_datastore())

def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

Expand Down
4 changes: 4 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class HomeServer(object):
self,
) -> synapse.replication.tcp.client.ReplicationClientHandler:
pass
def get_replication_data_handler(
self,
) -> synapse.replication.tcp.client.ReplicationDataHandler:
pass
def get_federation_registry(
self,
) -> synapse.federation.federation_server.FederationHandlerRegistry:
Expand Down