Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split presence out of master #9820

Merged
merged 9 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9820.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling presence on a worker.
7 changes: 5 additions & 2 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,11 @@ class Porter(object):
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
"account_data_sequence",
("room_account_data", "room_tags_revisions", "account_data"),
)
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
await self._setup_auth_chain_sequence()

# Step 3. Get tables.
Expand Down
31 changes: 4 additions & 27 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
Expand All @@ -64,7 +63,7 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
Expand Down Expand Up @@ -110,6 +109,7 @@
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
Expand All @@ -121,26 +121,6 @@
logger = logging.getLogger("synapse.app.generic_worker")


class PresenceStatusStubServlet(RestServlet):
"""If presence is disabled this servlet can be used to stub out setting
presence status.
"""

PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")

def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()

async def on_GET(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {"presence": "offline"}

async def on_PUT(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {}


class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
Expand Down Expand Up @@ -241,6 +221,7 @@ class GenericWorkerSlavedStore(
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
Expand All @@ -259,7 +240,6 @@ class GenericWorkerSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedPresenceStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
Expand Down Expand Up @@ -327,10 +307,7 @@ def _listen_http(self, listener_config: ListenerConfig):

user_directory.register_servlets(self, resource)

# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
presence.register_servlets(self, resource)

groups.register_servlets(self, resource)

Expand Down
27 changes: 26 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
"""

events = attr.ib(
Expand All @@ -85,6 +93,11 @@ class WriterLocations:
type=List[str],
converter=_instance_to_list_converter,
)
presence = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)


class WorkerConfig(Config):
Expand Down Expand Up @@ -188,7 +201,14 @@ def read_config(self, config, **kwargs):

# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
for stream in (
"events",
"typing",
"to_device",
"account_data",
"receipts",
"presence",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
Expand All @@ -215,6 +235,11 @@ def read_config(self, config, **kwargs):
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")

if len(self.writers.presence) != 1:
raise ConfigError(
"Must only specify one instance to handle `presence` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand Down
56 changes: 36 additions & 20 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@


class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
"""Parts of the PresenceHandler that are shared between workers and presence
writer"""

def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
Expand Down Expand Up @@ -308,17 +309,25 @@ def __init__(self, hs):
super().__init__(hs)
self.hs = hs

self._presence_writer_instance = hs.config.worker.writers.presence[0]

self._presence_enabled = hs.config.use_presence

# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
"m.presence",
hs.config.worker.writers.presence,
)

# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]

self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline = {}

self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
Expand Down Expand Up @@ -351,31 +360,32 @@ def send_user_sync(self, user_id, is_syncing, last_sync_ms):
)

def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
had recently stopped syncing.
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.

Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
# Safe to skip because we haven't yet told the master they were offline
# Safe to skip because we haven't yet told the presence writer they
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())

def mark_as_going_offline(self, user_id):
"""A user has stopped syncing. We wait before notifying the master as
its likely they'll come back soon. This allows us to avoid sending
a stopped syncing immediately followed by a started syncing notification
to the master
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer

Args:
user_id (str)
"""
self.users_going_offline[user_id] = self.clock.time_msec()

def send_stop_syncing(self):
"""Check if there are any users who have stopped syncing a while ago
and haven't come back yet. If there are poke the master about them.
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
Expand Down Expand Up @@ -491,9 +501,12 @@ async def set_state(self, target_user, state, ignore_status_msg=False):
if not self.hs.config.use_presence:
return

# Proxy request to master
# Proxy request to instance that writes presence
await self._set_state_client(
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
instance_name=self._presence_writer_instance,
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
)

async def bump_presence_active_time(self, user):
Expand All @@ -504,9 +517,11 @@ async def bump_presence_active_time(self, user):
if not self.hs.config.use_presence:
return

# Proxy request to master
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(user_id=user_id)
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
)


class PresenceHandler(BasePresenceHandler):
Expand Down Expand Up @@ -1908,7 +1923,7 @@ def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
self._queue_presence_updates = True

# Whether this instance is a presence writer.
self._presence_writer = hs.config.worker.worker_app is None
self._presence_writer = self._instance_name in hs.config.worker.writers.presence

# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
Expand Down Expand Up @@ -1956,7 +1971,7 @@ def send_presence_to_destinations(
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).

Must only be called on the master process.
Must only be called on the presence writer process.
"""

# This should only be called on a presence writer.
Expand Down Expand Up @@ -2002,10 +2017,11 @@ async def get_replication_rows(
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).

On workers this will query the master process via HTTP replication.
On workers this will query the presence writer process via HTTP replication.
"""
if instance_name != self._instance_name:
# If not local we query over http replication from the master
# If not local we query over http replication from the presence
# writer
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
Expand Down
5 changes: 4 additions & 1 deletion synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ async def _handle_request(self, request, **kwargs):
def make_client(cls, hs):
"""Create a client that makes requests.

Returns a callable that accepts the same parameters as `_serialize_payload`.
Returns a callable that accepts the same parameters as
`_serialize_payload`, but also accepts an extra `instance_name`
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
parameter to specify which instance to hit (the instance must be in
the `instance_map` config).
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()
Expand Down
50 changes: 0 additions & 50 deletions synapse/replication/slave/storage/presence.py

This file was deleted.

Loading