diff --git a/changelog.d/15488.feature b/changelog.d/15488.feature new file mode 100644 index 000000000000..234be505d6b8 --- /dev/null +++ b/changelog.d/15488.feature @@ -0,0 +1 @@ +Add automatic purge after all users forgotten a room. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 63b35c8d621f..f6a88247ea4f 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -160,6 +160,7 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) + self._redaction_retention_period = hs.config.server.redaction_retention_period self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: @@ -176,6 +177,44 @@ def __init__(self, hs: "HomeServer"): job.longest_max_lifetime, ) + if self._is_master: + self.clock.looping_call( + run_as_background_process, + 3600 * 1000, + "purge_rooms", + self.purge_rooms, + ) + + async def purge_rooms(self) -> None: + rooms_to_purge = await self.store.get_rooms_to_purge() + for r in rooms_to_purge: + if r["status"] == "complete": + # TODO cleanup the table + continue + + room_id = r["room_id"] + delete_id = r["delete_id"] + + delete_status = self._delete_by_id.get(delete_id) + if delete_status is not None: + # a purge background task is already running (or has run) + # for this delete id + continue + + purge_now = False + if r["timestamp"] is None: + purge_now = True + else: + time_since_marked = self.clock.time_msec() - r["timestamp"] + if time_since_marked >= self._redaction_retention_period: + purge_now = True + + # TODO 2 stages purge, keep memberships for a while so we don't "break" sync + if purge_now: + self._delete_by_id[delete_id] = DeleteStatus() + self._delete_by_room.setdefault(room_id, []).append(delete_id) + await self.purge_room(room_id, delete_id, True) + async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] ) -> None: @@ -399,14 +438,20 @@ def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]: """ return self._delete_by_room.get(room_id) - async def purge_room(self, room_id: str, force: bool = False) -> None: + async def purge_room( + self, + room_id: str, + delete_id: str, + force: bool = False, + ) -> None: """Purge the given room from the database. - This function is part the delete room v1 API. Args: room_id: room to be purged force: set true to skip checking for joined users. """ + logger.info("starting purge room_id %s", room_id) + async with self.pagination_lock.write(room_id): # first check that we have no users in this room if not force: @@ -414,7 +459,9 @@ async def purge_room(self, room_id: str, force: bool = False) -> None: if joined: raise SynapseError(400, "Users are still joined to this room") - await self._storage_controllers.purge_events.purge_room(room_id) + await self._storage_controllers.purge_events.purge_room(room_id, delete_id) + + logger.info("purge complete for room_id %s", room_id) @trace async def get_messages( @@ -654,36 +701,22 @@ async def _shutdown_and_purge_room( self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN - self._delete_by_id[ - delete_id - ].shutdown_room = await self._room_shutdown_handler.shutdown_room( - room_id=room_id, - requester_user_id=requester_user_id, - new_room_user_id=new_room_user_id, - new_room_name=new_room_name, - message=message, - block=block, - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING - - if purge: - logger.info("starting purge room_id %s", room_id) - - # first check that we have no users in this room - if not force_purge: - joined = await self.store.is_host_joined( - room_id, self._server_name - ) - if joined: - raise SynapseError( - 400, "Users are still joined to this room" - ) + self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN + self._delete_by_id[ + delete_id + ].shutdown_room = await self._room_shutdown_handler.shutdown_room( + room_id=room_id, + requester_user_id=requester_user_id, + new_room_user_id=new_room_user_id, + new_room_name=new_room_name, + message=message, + block=block, + ) - await self._storage_controllers.purge_events.purge_room(room_id) + if purge: + self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING + await self.purge_room(room_id, delete_id, force_purge) - logger.info("purge complete for room_id %s", room_id) self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE except Exception: f = Failure() diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed805d6ec87e..f487c834657f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -2066,6 +2066,9 @@ async def forget(self, user: UserID, room_id: str) -> None: # the table `current_state_events` and `get_current_state_events` is `None`. await self.store.forget(user_id, room_id) + if await self.store.is_locally_forgotten_room(room_id): + await self.store.upsert_room_to_purge(room_id, None, self.clock.time_msec()) + def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: """ diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 4de56bf13f31..6f213b3d87d2 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -39,6 +39,7 @@ from synapse.types import JsonDict, RoomID, UserID, create_requester from synapse.types.state import StateFilter from synapse.util import json_decoder +from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.api.auth import Auth @@ -356,10 +357,14 @@ async def _delete_room( block=block, ) + delete_id = random_string(16) + # Purge room if purge: try: - await pagination_handler.purge_room(room_id, force=force_purge) + await pagination_handler.purge_room( + room_id, delete_id, force=force_purge + ) except NotFoundError: if block: # We can block unknown rooms with this endpoint, in which case diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 7699cc8d1bc0..cf9333262aa7 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -944,8 +944,10 @@ async def on_GET( class RoomForgetRestServlet(TransactionRestServlet): def __init__(self, hs: "HomeServer"): super().__init__(hs) + self.store = hs.get_datastores().main self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self.clock = hs.get_clock() def register(self, http_server: HttpServer) -> None: PATTERNS = "/rooms/(?P[^/]*)/forget" diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index c599397b86ff..18e26bdae546 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -30,14 +30,23 @@ class PurgeEventsStorageController: def __init__(self, hs: "HomeServer", stores: Databases): self.stores = stores + self.clock = hs.get_clock() - async def purge_room(self, room_id: str) -> None: + async def purge_room(self, room_id: str, delete_id: str) -> None: """Deletes all record of a room""" + await self.stores.main.upsert_room_to_purge( + room_id, delete_id, self.clock.time_msec(), "purging" + ) + with nested_logging_context(room_id): state_groups_to_delete = await self.stores.main.purge_room(room_id) await self.stores.state.purge_room_state(room_id, state_groups_to_delete) + await self.stores.main.upsert_room_to_purge( + room_id, delete_id, self.clock.time_msec(), "complete" + ) + async def purge_history( self, room_id: str, token: str, delete_local_events: bool ) -> None: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index daad58291a8b..64effda039db 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -17,6 +17,7 @@ from typing import ( TYPE_CHECKING, AbstractSet, + Any, Collection, Dict, FrozenSet, @@ -1284,6 +1285,36 @@ async def is_locally_forgotten_room(self, room_id: str) -> bool: # If any rows still exist it means someone has not forgotten this room yet return not rows[0][0] + async def upsert_room_to_purge( + self, + room_id: str, + delete_id: Optional[str] = None, + timestamp: Optional[int] = None, + status: str = "waiting", + ) -> None: + await self.db_pool.simple_upsert( + "rooms_to_purge", + { + "room_id": room_id, + "delete_id": delete_id, + }, + { + "room_id": room_id, + "delete_id": delete_id, + "timestamp": timestamp, + "status": status, + }, + desc="upsert_room_to_purge", + ) + + async def get_rooms_to_purge(self) -> List[Dict[str, Any]]: + return await self.db_pool.simple_select_list( + table="rooms_to_purge", + keyvalues={}, + retcols=("room_id", "delete_id", "timestamp", "status"), + desc="rooms_to_purge_fetch", + ) + async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: """Get all rooms that the user has ever been in. diff --git a/synapse/storage/schema/main/delta/76/03_rooms_to_purge.sql b/synapse/storage/schema/main/delta/76/03_rooms_to_purge.sql new file mode 100644 index 000000000000..3aa871a92e76 --- /dev/null +++ b/synapse/storage/schema/main/delta/76/03_rooms_to_purge.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS rooms_to_purge( + room_id text NOT NULL, + delete_id text, + "timestamp" bigint, + "status" text NOT NULL, + UNIQUE(room_id, delete_id) +); diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index eb50086c508e..7a91ad266cde 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -27,6 +27,7 @@ from synapse.handlers.pagination import PaginationHandler, PurgeStatus from synapse.rest.client import directory, events, login, room from synapse.server import HomeServer +from synapse.types import UserID from synapse.util import Clock from synapse.util.stringutils import random_string @@ -502,6 +503,9 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: ) self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/" + self.room_member_handler = hs.get_room_member_handler() + self.pagination_handler = hs.get_pagination_handler() + @parameterized.expand( [ ("DELETE", "/_synapse/admin/v2/rooms/%s"), @@ -972,6 +976,25 @@ def test_shutdown_room_block_peek(self) -> None: # Assert we can no longer peek into the room self._assert_peek(self.room_id, expect_code=403) + @unittest.override_config({"redaction_retention_period": "0"}) + def test_purge_forgotten_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.admin_user, + tok=self.admin_user_tok, + ) + + self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + self.get_success( + self.room_member_handler.forget( + UserID.from_string(self.admin_user), room_id + ) + ) + + self.get_success(self.pagination_handler.purge_rooms()) + + self._is_purged(room_id) + def _is_blocked(self, room_id: str, expect: bool = True) -> None: """Assert that the room is blocked or not""" d = self.store.is_room_blocked(room_id) diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py index 28b999573e75..3bbb6e9fc36b 100644 --- a/tests/rest/admin/test_server_notice.py +++ b/tests/rest/admin/test_server_notice.py @@ -417,7 +417,7 @@ def test_send_server_notice_delete_room(self) -> None: self.get_success( self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user) ) - self.get_success(self.pagination_handler.purge_room(first_room_id)) + self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id")) # user is not member anymore self._check_invite_and_join_status(self.other_user, 0, 0) diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 857e2caf2e24..29140d78f46f 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -115,7 +115,7 @@ def test_purge_room(self) -> None: # Purge everything before this topological token self.get_success( - self._storage_controllers.purge_events.purge_room(self.room_id) + self._storage_controllers.purge_events.purge_room(self.room_id, "delete_id") ) # The events aren't found.