Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow moving account data and receipts streams off master #9104

Merged
merged 14 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9104.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for moving off receipts and account data persistence off master.
15 changes: 14 additions & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, room_keys, sync, user_directory
from synapse.rest.client.v2_alpha import (
account_data,
groups,
read_marker,
receipts,
room_keys,
sync,
tags,
user_directory,
)
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
Expand Down Expand Up @@ -531,6 +540,10 @@ def _listen_http(self, listener_config: ListenerConfig):
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
room_keys.register_servlets(self, resource)
tags.register_servlets(self, resource)
account_data.register_servlets(self, resource)
receipts.register_servlets(self, resource)
read_marker.register_servlets(self, resource)

SendToDeviceRestServlet(self).register(resource)

Expand Down
18 changes: 17 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class WriterLocations:
to_device = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter,
)
account_data = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter,
)
receipts = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter,
)
clokep marked this conversation as resolved.
Show resolved Hide resolved


class WorkerConfig(Config):
Expand Down Expand Up @@ -127,7 +133,7 @@ def read_config(self, config, **kwargs):

# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing", "to_device"):
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
Expand All @@ -141,6 +147,16 @@ def read_config(self, config, **kwargs):
"Must only specify one instance to handle `to_device` messages."
)

if len(self.writers.account_data) != 1:
raise ConfigError(
"Must only specify one instance to handle `account_data` messages."
)

if len(self.writers.receipts) != 1:
raise ConfigError(
"Must only specify one instance to handle `receipts` messages."
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
Expand Down
144 changes: 144 additions & 0 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,14 +13,157 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from typing import TYPE_CHECKING, List, Tuple

from synapse.replication.http.account_data import (
ReplicationAddTagRestServlet,
ReplicationRemoveTagRestServlet,
ReplicationRoomAccountDataRestServlet,
ReplicationUserAccountDataRestServlet,
)
from synapse.types import JsonDict, UserID

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer


class AccountDataHandler:
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()

self._user_data_client = ReplicationUserAccountDataRestServlet.make_client(hs)
self._room_data_client = ReplicationRoomAccountDataRestServlet.make_client(hs)
self._add_tag_client = ReplicationAddTagRestServlet.make_client(hs)
self._remove_tag_client = ReplicationRemoveTagRestServlet.make_client(hs)
self._account_data_writers = hs.config.worker.writers.account_data

async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
) -> int:
"""Add some account_data to a room for a user.

Args:
user_id: The user to add a tag for.
room_id: The room to add a tag for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.

Returns:
The maximum stream ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_account_data_to_room(
user_id, room_id, account_data_type, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)

return max_stream_id
else:
response = await self._room_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
account_data_type=account_data_type,
content=content,
)
return response["max_stream_id"]

async def add_account_data_for_user(
self, user_id: str, account_data_type: str, content: JsonDict
) -> int:
"""Add some account_data to a room for a user.

Args:
user_id: The user to add a tag for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.

Returns:
The maximum stream ID.
"""

if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_account_data_for_user(
user_id, account_data_type, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._user_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
account_data_type=account_data_type,
content=content,
)
return response["max_stream_id"]

async def add_tag_to_room(
self, user_id: str, room_id: str, tag: str, content: JsonDict
) -> int:
"""Add a tag to a room for a user.

Args:
user_id: The user to add a tag for.
room_id: The room to add a tag for.
tag: The tag name to add.
content: A json object to associate with the tag.

Returns:
The next account data ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.add_tag_to_room(
user_id, room_id, tag, content
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._add_tag_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
tag=tag,
content=content,
)
return response["max_stream_id"]

async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> int:
"""Remove a tag from a room for a user.

Returns:
The next account data ID.
"""
if self._instance_name in self._account_data_writers:
max_stream_id = await self._store.remove_tag_from_room(
user_id, room_id, tag
)

self._notifier.on_new_event(
"account_data_key", max_stream_id, users=[user_id]
)
return max_stream_id
else:
response = await self._remove_tag_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
tag=tag,
)
return response["max_stream_id"]


class AccountDataEventSource:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
Expand Down
5 changes: 2 additions & 3 deletions synapse/handlers/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.config.server_name
self.store = hs.get_datastore()
self.account_data_handler = hs.get_account_data_handler()
self.read_marker_linearizer = Linearizer(name="read_marker")
self.notifier = hs.get_notifier()

async def received_client_read_marker(
self, room_id: str, user_id: str, event_id: str
Expand All @@ -59,7 +59,6 @@ async def received_client_read_marker(

if should_update:
content = {"event_id": event_id}
max_id = await self.store.add_account_data_to_room(
await self.account_data_handler.add_account_data_to_room(
user_id, room_id, "m.fully_read", content
)
self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
27 changes: 22 additions & 5 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,26 @@ def __init__(self, hs: "HomeServer"):
self.server_name = hs.config.server_name
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)

# We only need to poke the federation sender explicitly if its on the
# same instance. Other federation sender instances will get notified by
# `synapse.app.generic_worker.FederationSenderHandler` when it sees it
# in the receipts stream.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()

# If we can handle the receipt EDUs we do so, otherwise we route them
# to the appropriate worker.
if hs.get_instance_name() in hs.config.worker.writers.receipts:
hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
else:
hs.get_federation_registry().register_instances_for_edu(
"m.receipt", hs.config.worker.writers.receipts,
)

self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()

Expand Down Expand Up @@ -125,7 +141,8 @@ async def received_client_receipt(
if not is_new:
return

await self.federation.send_read_receipt(receipt)
if self.federation_sender:
await self.federation_sender.send_read_receipt(receipt)


class ReceiptEventSource:
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, hs: "HomeServer"):
self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()

self.member_linearizer = Linearizer(name="member")

Expand Down Expand Up @@ -253,7 +254,7 @@ async def copy_room_tags_and_direct_to_room(
direct_rooms[key].append(new_room_id)

# Save back to user's m.direct account data
await self.store.add_account_data_for_user(
await self.account_data_handler.add_account_data_for_user(
user_id, AccountDataTypes.DIRECT, direct_rooms
)
break
Expand All @@ -263,7 +264,9 @@ async def copy_room_tags_and_direct_to_room(

# Copy each room tag to the new room
for tag, tag_content in room_tags.items():
await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
await self.account_data_handler.add_tag_to_room(
user_id, new_room_id, tag, tag_content
)

async def update_membership(
self,
Expand Down
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
devices,
federation,
login,
Expand All @@ -40,6 +41,7 @@ def register_servlets(self, hs):
presence.register_servlets(hs, self)
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)

# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
Expand Down
Loading