Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add option to move event persistence off master (#7517)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored May 22, 2020
1 parent 4429764 commit e5c67d0
Show file tree
Hide file tree
Showing 22 changed files with 382 additions and 73 deletions.
1 change: 1 addition & 0 deletions changelog.d/7517.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add option to move event persistence off master.
3 changes: 3 additions & 0 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ class MockHomeserver:
def get_reactor(self):
return reactor

def get_instance_name(self):
return "master"


class Porter(object):
def __init__(self, **kwargs):
Expand Down
53 changes: 48 additions & 5 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,22 @@
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
from synapse.handlers.presence import (
BasePresenceHandler,
PresenceState,
get_interested_parties,
)
from synapse.http.server import JsonResource, OptionsResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.http.presence import (
ReplicationBumpPresenceActiveTime,
ReplicationPresenceSetState,
)
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
Expand Down Expand Up @@ -247,6 +255,9 @@ def __init__(self, hs):
# but we haven't notified the master of that yet
self.users_going_offline = {}

self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)

self._send_stop_syncing_loop = self.clock.looping_call(
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)
Expand Down Expand Up @@ -304,10 +315,6 @@ def send_stop_syncing(self):
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)

def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
return defer.succeed(None)

async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
Expand Down Expand Up @@ -386,6 +393,42 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
if count > 0
]

async def set_state(self, target_user, state, ignore_status_msg=False):
"""Set the presence state of the user.
"""
presence = state["presence"]

valid_presence = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)
if presence not in valid_presence:
raise SynapseError(400, "Invalid presence state")

user_id = target_user.to_string()

# If presence is disabled, no-op
if not self.hs.config.use_presence:
return

# Proxy request to master
await self._set_state_client(
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
)

async def bump_presence_active_time(self, user):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return

# Proxy request to master
user_id = user.to_string()
await self._bump_active_client(user_id=user_id)


class GenericWorkerTyping(object):
def __init__(self, hs):
Expand Down
1 change: 1 addition & 0 deletions synapse/config/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,6 @@ def read_config(*args, callback=None):
logging.warning("***** STARTING SERVER *****")
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
logging.info("Instance name: %s", hs.get_instance_name())

return logger
30 changes: 28 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import attr

from ._base import Config
from ._base import Config, ConfigError


@attr.s
Expand All @@ -27,6 +27,17 @@ class InstanceLocationConfig:
port = attr.ib(type=int)


@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
events: The instance that writes to the event and backfill streams.
"""

events = attr.ib(default="master", type=str)


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -83,11 +94,26 @@ def read_config(self, config, **kwargs):
bind_addresses.append("")

# A map from instance name to host/port of their HTTP replication endpoint.
instance_map = config.get("instance_map", {}) or {}
instance_map = config.get("instance_map") or {}
self.instance_map = {
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
}

# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writer for events also appears in
# `instance_map`.
if (
self.writers.events != "master"
and self.writers.events not in self.instance_map
):
raise ConfigError(
"Instance %r is configured to write events but does not appear in `instance_map` config."
% (self.writers.events,)
)

def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
Expand Down
16 changes: 10 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ def __init__(self, hs):
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._instance_name = hs.get_instance_name()
self._replication = hs.get_replication_data_handler()

self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
hs
)
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
Expand Down Expand Up @@ -1243,6 +1242,10 @@ async def do_invite_join(
content: The event content to use for the join event.
"""
# TODO: We should be able to call this on workers, but the upgrading of
# room stuff after join currently doesn't work on workers.
assert self.config.worker.worker_app is None

logger.debug("Joining %s to %s", joinee, room_id)

origin, event, room_version_obj = await self._make_and_verify_event(
Expand Down Expand Up @@ -1314,7 +1317,7 @@ async def do_invite_join(
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
"master", "events", max_stream_id
self.config.worker.writers.events, "events", max_stream_id
)

# Check whether this room is the result of an upgrade of a room we already know
Expand Down Expand Up @@ -2854,8 +2857,9 @@ async def persist_events_and_notify(
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker_app:
result = await self._send_events_to_master(
if self.config.worker.writers.events != self._instance_name:
result = await self._send_events(
instance_name=self.config.worker.writers.events,
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,11 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._instance_name = hs.get_instance_name()

self.room_invite_state_types = self.hs.config.room_invite_state_types

self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
self.send_event = ReplicationSendEventRestServlet.make_client(hs)

# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
Expand Down Expand Up @@ -835,8 +836,9 @@ async def handle_new_client_event(
success = False
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
result = await self.send_event_to_master(
if self.config.worker.writers.events != self._instance_name:
result = await self.send_event(
instance_name=self.config.worker.writers.events,
event_id=event.event_id,
store=self.store,
requester=requester,
Expand Down Expand Up @@ -902,9 +904,9 @@ async def persist_and_notify_client_event(
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
This should only be run on master.
This should only be run on the instance in charge of persisting events.
"""
assert not self.config.worker_app
assert self.config.worker.writers.events == self._instance_name

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ async def set_state(
) -> None:
"""Set the presence state of the user. """

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID):
"""We've seen the user do something that indicates they're interacting
with the app.
"""


class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "synapse.server.HomeServer"):
Expand Down
7 changes: 7 additions & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def __init__(self, hs):
self.room_member_handler = hs.get_room_member_handler()
self.config = hs.config

self._replication = hs.get_replication_data_handler()

# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")

Expand Down Expand Up @@ -752,6 +754,11 @@ async def create_room(
if room_alias:
result["room_alias"] = room_alias.to_string()

# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", last_stream_id
)

return result, last_stream_id

async def _send_events_for_new_room(
Expand Down
39 changes: 31 additions & 8 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.replication.http.membership import (
ReplicationLocallyRejectInviteRestServlet,
)
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
Expand All @@ -44,11 +47,6 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta

def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
Expand All @@ -71,6 +69,17 @@ def __init__(self, hs):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles

self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
else:
self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
hs
)

# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
# it doesn't store state.
Expand Down Expand Up @@ -121,6 +130,22 @@ async def _remote_reject_invite(
"""
raise NotImplementedError()

async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
if self._is_on_event_persistence_instance:
return await self.persist_event_storage.locally_reject_invite(
user_id, room_id
)
else:
result = await self._locally_reject_client(
instance_name=self._event_stream_writer_instance,
user_id=user_id,
room_id=room_id,
)
return result["stream_id"]

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
Expand Down Expand Up @@ -1015,9 +1040,7 @@ async def _remote_reject_invite(
#
logger.warning("Failed to reject invite: %s", e)

stream_id = await self.store.locally_reject_invite(
target.to_string(), room_id
)
stream_id = await self.locally_reject_invite(target.to_string(), room_id)
return None, stream_id

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
Expand Down
4 changes: 3 additions & 1 deletion synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
federation,
login,
membership,
presence,
register,
send_event,
streams,
Expand All @@ -35,10 +36,11 @@ def __init__(self, hs):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
federation.register_servlets(hs, self)
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
membership.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def make_client(cls, hs):
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()
local_instance_name = hs.get_instance_name()

master_host = hs.config.worker_replication_host
master_port = hs.config.worker_replication_http_port
Expand All @@ -151,6 +152,8 @@ def make_client(cls, hs):
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
Expand Down
Loading

0 comments on commit e5c67d0

Please sign in to comment.