Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add option to move event persistence off master #7517

Merged
merged 21 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3ca6b56
Make location of events writer configurable
erikjohnston Apr 30, 2020
a447c2e
Use new writers config
erikjohnston May 13, 2020
1f6dbc3
Enable moving event persistence off of master
erikjohnston May 11, 2020
9541abd
Add some debugging
erikjohnston May 15, 2020
7db1f22
Add check that event writer has an entry in instance_map
erikjohnston May 15, 2020
86b1d75
Newsfile
erikjohnston May 15, 2020
64949e7
Fix port script
erikjohnston May 15, 2020
7ed308a
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/mo…
erikjohnston May 18, 2020
2667552
Fixup review comments
erikjohnston May 18, 2020
c42b180
Fix typo
erikjohnston May 18, 2020
d2012df
Rename config var to stream_writers
erikjohnston May 18, 2020
6a539b6
fix missing continue
erikjohnston May 18, 2020
c0502f3
Move mark_remote_user_device_list_as_unsubscribed to worker store
erikjohnston May 18, 2020
a9ec5f4
Fix buping presence active time when using workers
erikjohnston May 18, 2020
fcbaf3f
Fix mypy
erikjohnston May 19, 2020
8b02186
Move locally_reject_invite into event persistence.
erikjohnston May 19, 2020
9468caa
Add assertion for do_invite_join.
erikjohnston May 19, 2020
76bf9cf
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/mo…
erikjohnston May 22, 2020
e343426
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/mo…
erikjohnston May 22, 2020
0b2ac9e
Fixup
erikjohnston May 22, 2020
f4a1848
fix missing change from master -> event writers
erikjohnston May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7517.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add option to move event persistence off master.
3 changes: 3 additions & 0 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ class MockHomeserver:
def get_reactor(self):
return reactor

def get_instance_name(self):
return "master"


class Porter(object):
def __init__(self, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions synapse/config/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,6 @@ def read_config(*args, callback=None):
logging.warning("***** STARTING SERVER *****")
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
logging.info("Instance name: %s", hs.get_instance_name())

return logger
30 changes: 28 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import attr

from ._base import Config
from ._base import Config, ConfigError


@attr.s
Expand All @@ -27,6 +27,17 @@ class InstanceLocationConfig:
port = attr.ib(type=int)


@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.

Attributes:
events: The instance that writes to the event and backfill streams.
"""

events = attr.ib(default="master", type=str)


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -83,11 +94,26 @@ def read_config(self, config, **kwargs):
bind_addresses.append("")

# A map from instance name to host/port of their HTTP replication endpoint.
instance_map = config.get("instance_map", {}) or {}
instance_map = config.get("instance_map") or {}
self.instance_map = {
name: InstanceLocationConfig(**c) for name, c in instance_map.items()
}

# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writer for events also appears in
# `instance_map`.
if (
self.writers.events != "master"
and self.writers.events not in self.instance_map
):
raise ConfigError(
"Instance %r is configured to write events but does not appear in `instance_map` config."
% (self.writers.events,)
)

def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ def __init__(self, hs):
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._instance_name = hs.get_instance_name()

self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
hs
)
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
Expand Down Expand Up @@ -2837,8 +2836,9 @@ async def persist_events_and_notify(
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker_app:
await self._send_events_to_master(
if self.config.worker.writers.events != self._instance_name:
await self._send_events(
instance_name=self.config.worker.writers.events,
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,11 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._instance_name = hs.get_instance_name()

self.room_invite_state_types = self.hs.config.room_invite_state_types

self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
self.send_event = ReplicationSendEventRestServlet.make_client(hs)

# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
Expand Down Expand Up @@ -822,8 +823,9 @@ async def handle_new_client_event(
success = False
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
await self.send_event_to_master(
if self.config.worker.writers.events != self._instance_name:
await self.send_event(
instance_name=self.config.worker.writers.events,
event_id=event.event_id,
store=self.store,
requester=requester,
Expand Down Expand Up @@ -886,9 +888,9 @@ async def persist_and_notify_client_event(
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.

This should only be run on master.
This should only be run on instance in charge of persisting events.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
assert not self.config.worker_app
assert self.config.worker.writers.events == self._instance_name

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def make_client(cls, hs):
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()
local_instance_name = hs.get_instance_name()

master_host = hs.config.worker_replication_host
master_port = hs.config.worker_replication_http_port
Expand All @@ -151,6 +152,8 @@ def make_client(cls, hs):
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
Expand Down
10 changes: 10 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams import (
STREAMS_MAP,
BackfillStream,
CachesStream,
EventsStream,
FederationStream,
Stream,
)
Expand Down Expand Up @@ -87,6 +89,14 @@ def __init__(self, hs):
self._streams_to_replicate.append(stream)
continue

# Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence.
if (
isinstance(stream, (EventsStream, BackfillStream))
and hs.config.worker.writers.events == hs.get_instance_name()
):
self._streams_to_replicate.append(stream)
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/data_stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def __init__(self, main_store_class, hs):

self.main = main_store_class(database, db_conn, hs)

# If we're on a process that can persist events (currently
# master), also instantiate a `PersistEventsStore`
if hs.config.worker.worker_app is None:
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.config.worker.writers.events == hs.get_instance_name():
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
self.persist_events = PersistEventsStore(
hs, database, self.main
)
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore")
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator

# This should only exist on master for now
# This should only exist on instances that are configured to write
assert (
hs.config.worker.worker_app is None
), "Can only instantiate PersistEventsStore on master"
hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"

@_retry_on_integrity_error
@defer.inlineCallbacks
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)

if hs.config.worker_app is None:
if hs.config.worker.writers.events == hs.get_instance_name():
# We are the process in charge of generating stream ids for events,
# so instantiate ID generators based on the database
self._stream_id_gen = StreamIdGenerator(
Expand Down