Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Servlet to purge old rooms (#5845)
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 21, 2020
2 parents 51a0fa5 + 119aa31 commit 5a60e07
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/5845.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an admin API to purge old rooms from the database.
18 changes: 18 additions & 0 deletions docs/admin_api/purge_room.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Purge room API
==============

This API will remove all trace of a room from your database.

All local users must have left the room before it can be removed.

The API is:

```
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
```

You must authenticate using the access token of an admin user.
17 changes: 17 additions & 0 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self._server_name = hs.hostname

self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
Expand Down Expand Up @@ -261,6 +262,22 @@ def get_purge_status(self, purge_id):
"""
return self._purges_by_id.get(purge_id)

async def purge_room(self, room_id):
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
# check we know about the room
await self.store.get_room_version(room_id)

# first check that we have no users in this room
joined = await defer.maybeDeferred(
self.store.is_host_joined, room_id, self._server_name
)

if joined:
raise SynapseError(400, "Users are still joined to this room")

await self.store.purge_room(room_id)

@defer.inlineCallbacks
def get_messages(
self,
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -738,6 +739,7 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)

Expand Down
57 changes: 57 additions & 0 deletions synapse/rest/admin/purge_room_servlet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re

from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.rest.admin import assert_requester_is_admin


class PurgeRoomServlet(RestServlet):
"""Servlet which will remove all trace of a room from the database
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
returns:
{}
"""

PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)

def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.auth = hs.get_auth()
self.pagination_handler = hs.get_pagination_handler()

async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)

body = parse_json_object_from_request(request)
assert_params_in_dict(body, ("room_id",))

await self.pagination_handler.purge_room(body["room_id"])

return (200, {})
137 changes: 137 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2184,6 +2184,143 @@ def _find_unreferenced_groups_during_purge(self, txn, state_groups):

return to_delete, to_dedelta

def purge_room(self, room_id):
"""Deletes all record of a room
Args:
room_id (str):
"""

return self.runInteraction("purge_room", self._purge_room_txn, room_id)

def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)

txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)

txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)

txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)

# and then tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
"event_push_actions_staging",
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"redactions",
"rejections",
"state_events",
):
logger.info("[purge] removing %s from %s", room_id, table)

txn.execute(
"""
DELETE FROM %s WHERE event_id IN (
SELECT event_id FROM events WHERE room_id=?
)
"""
% (table,),
(room_id,),
)

# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
"event_push_actions",
"event_search",
"events",
"group_rooms",
"public_room_list_stream",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
"topics",
"users_in_public_rooms",
"users_who_share_private_rooms",
# no useful index, but let's clear them anyway
"appservice_room_list",
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"local_invites",
"room_account_data",
"room_tags",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))

# Other tables we do NOT need to clear out:
#
# - blocked_rooms
# This is important, to make sure that we don't accidentally rejoin a blocked
# room after it was purged
#
# - user_directory
# This has a room_id column, but it is unused
#

# Other tables that we might want to consider clearing out include:
#
# - event_reports
# Given that these are intended for abuse management my initial
# inclination is to leave them in place.
#
# - current_state_delta_stream
# - ex_outlier_stream
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch of cache invalidation here

logger.info("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down

0 comments on commit 5a60e07

Please sign in to comment.