Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add experimental support for MSC3391: deleting account data #14714

Merged
merged 15 commits into from
Jan 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from typing import TYPE_CHECKING, Awaitable, Callable, Collection, List, Optional, Tuple

from synapse.replication.http.account_data import (
ReplicationAddRoomAccountDataRestServlet,
ReplicationAddTagRestServlet,
ReplicationAddUserAccountDataRestServlet,
ReplicationRemoveRoomAccountDataRestServlet,
ReplicationRemoveTagRestServlet,
ReplicationRoomAccountDataRestServlet,
ReplicationUserAccountDataRestServlet,
ReplicationRemoveUserAccountDataRestServlet,
)
from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID
Expand All @@ -41,8 +43,18 @@ def __init__(self, hs: "HomeServer"):
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()

self._user_data_client = ReplicationUserAccountDataRestServlet.make_client(hs)
self._room_data_client = ReplicationRoomAccountDataRestServlet.make_client(hs)
self._add_user_data_client = (
ReplicationAddUserAccountDataRestServlet.make_client(hs)
)
self._remove_user_data_client = (
ReplicationRemoveUserAccountDataRestServlet.make_client(hs)
)
self._add_room_data_client = (
ReplicationAddRoomAccountDataRestServlet.make_client(hs)
)
self._remove_room_data_client = (
ReplicationRemoveRoomAccountDataRestServlet.make_client(hs)
)
self._add_tag_client = ReplicationAddTagRestServlet.make_client(hs)
self._remove_tag_client = ReplicationRemoveTagRestServlet.make_client(hs)
self._account_data_writers = hs.config.worker.writers.account_data
Expand Down Expand Up @@ -112,7 +124,7 @@ async def add_account_data_to_room(

return max_stream_id
else:
response = await self._room_data_client(
response = await self._add_room_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
room_id=room_id,
Expand All @@ -127,9 +139,9 @@ async def add_account_data_for_user(
"""Add some global account_data for a user.

Args:
user_id: The user to add a tag for.
user_id: The user to add some account data for.
account_data_type: The type of account_data to add.
content: A json object to associate with the tag.
content: The content json dictionary.

Returns:
The maximum stream ID.
Expand All @@ -148,7 +160,7 @@ async def add_account_data_for_user(

return max_stream_id
else:
response = await self._user_data_client(
response = await self._add_user_data_client(
instance_name=random.choice(self._account_data_writers),
user_id=user_id,
account_data_type=account_data_type,
Expand Down
90 changes: 86 additions & 4 deletions synapse/replication/http/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
logger = logging.getLogger(__name__)


class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
class ReplicationAddUserAccountDataRestServlet(ReplicationEndpoint):
"""Add user account data on the appropriate account data worker.

Request format:
Expand Down Expand Up @@ -73,7 +73,46 @@ async def _handle_request( # type: ignore[override]
return 200, {"max_stream_id": max_stream_id}


class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
class ReplicationRemoveUserAccountDataRestServlet(ReplicationEndpoint):
"""Remove user account data on the appropriate account data worker.

Request format:

POST /_synapse/replication/remove_user_account_data/:user_id/:type

{
"content": { ... },
}

"""

NAME = "remove_user_account_data"
PATH_ARGS = ("user_id", "account_data_type")
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str, account_data_type: str
) -> JsonDict:
return {}

async def _handle_request( # type: ignore[override]
self, request: Request, user_id: str, account_data_type: str
) -> Tuple[int, JsonDict]:
max_stream_id = await self.handler.remove_account_data_for_user(
user_id, account_data_type
)

return 200, {"max_stream_id": max_stream_id}


class ReplicationAddRoomAccountDataRestServlet(ReplicationEndpoint):
"""Add room account data on the appropriate account data worker.

Request format:
Expand Down Expand Up @@ -118,6 +157,45 @@ async def _handle_request( # type: ignore[override]
return 200, {"max_stream_id": max_stream_id}


class ReplicationRemoveRoomAccountDataRestServlet(ReplicationEndpoint):
"""Remove room account data on the appropriate account data worker.

Request format:

POST /_synapse/replication/remove_room_account_data/:user_id/:room_id/:account_data_type

{
"content": { ... },
}

"""

NAME = "remove_room_account_data"
PATH_ARGS = ("user_id", "room_id", "account_data_type")
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.handler = hs.get_account_data_handler()
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str, room_id: str, account_data_type: str, content: JsonDict
) -> JsonDict:
return {}

async def _handle_request( # type: ignore[override]
self, request: Request, user_id: str, room_id: str, account_data_type: str
) -> Tuple[int, JsonDict]:
max_stream_id = await self.handler.remove_account_data_for_room(
user_id, room_id, account_data_type
)

return 200, {"max_stream_id": max_stream_id}


class ReplicationAddTagRestServlet(ReplicationEndpoint):
"""Add tag on the appropriate account data worker.

Expand Down Expand Up @@ -206,7 +284,11 @@ async def _handle_request( # type: ignore[override]


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserAccountDataRestServlet(hs).register(http_server)
ReplicationRoomAccountDataRestServlet(hs).register(http_server)
ReplicationAddUserAccountDataRestServlet(hs).register(http_server)
ReplicationAddRoomAccountDataRestServlet(hs).register(http_server)
ReplicationAddTagRestServlet(hs).register(http_server)
ReplicationRemoveTagRestServlet(hs).register(http_server)

if hs.config.experimental.msc3391_enabled:
ReplicationRemoveUserAccountDataRestServlet(hs).register(http_server)
ReplicationRemoveRoomAccountDataRestServlet(hs).register(http_server)