This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add config option to forget rooms automatically when users leave them #15224
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
d7d29f2
Add `forget_rooms_on_leave` config option
601fe80
Add table to track position of room forgetter
b4926d9
Give workers the ability to forget rooms
923d523
Implement forgetting of rooms on leave
dea5247
Add test for forgetting rooms on leave
f3badbb
Add newsfile
5dfada4
fixup: lift non-forget on leave branch out of loop
a7b6777
fixup: replace handrolled query with `simple_update_txn`
767347f
Merge remote-tracking branch 'origin/develop' into squah/forget_rooms…
95e81e3
fixup: move schema delta to correct directory
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them. | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
import logging | ||
import random | ||
from http import HTTPStatus | ||
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple | ||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple | ||
|
||
from synapse import types | ||
from synapse.api.constants import ( | ||
|
@@ -38,7 +38,10 @@ | |
from synapse.events import EventBase | ||
from synapse.events.snapshot import EventContext | ||
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN | ||
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler | ||
from synapse.logging import opentracing | ||
from synapse.metrics import event_processing_positions | ||
from synapse.metrics.background_process_metrics import run_as_background_process | ||
from synapse.module_api import NOT_SPAM | ||
from synapse.types import ( | ||
JsonDict, | ||
|
@@ -280,9 +283,25 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: | |
""" | ||
raise NotImplementedError() | ||
|
||
@abc.abstractmethod | ||
async def forget(self, user: UserID, room_id: str) -> None: | ||
raise NotImplementedError() | ||
user_id = user.to_string() | ||
|
||
member = await self._storage_controllers.state.get_current_state_event( | ||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id | ||
) | ||
membership = member.membership if member else None | ||
|
||
if membership is not None and membership not in [ | ||
Membership.LEAVE, | ||
Membership.BAN, | ||
]: | ||
raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) | ||
|
||
# In normal case this call is only required if `membership` is not `None`. | ||
# But: After the last member had left the room, the background update | ||
# `_background_remove_left_rooms` is deleting rows related to this room from | ||
# the table `current_state_events` and `get_current_state_events` is `None`. | ||
await self.store.forget(user_id, room_id) | ||
Comment on lines
286
to
+304
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't see why forgetting rooms couldn't be done on any worker, so I made the action available to all workers. |
||
|
||
async def ratelimit_multiple_invites( | ||
self, | ||
|
@@ -2046,25 +2065,141 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: | |
"""Implements RoomMemberHandler._user_left_room""" | ||
user_left_room(self.distributor, target, room_id) | ||
|
||
async def forget(self, user: UserID, room_id: str) -> None: | ||
user_id = user.to_string() | ||
|
||
member = await self._storage_controllers.state.get_current_state_event( | ||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id | ||
) | ||
membership = member.membership if member else None | ||
class RoomForgetterHandler(StateDeltasHandler): | ||
"""Forgets rooms when they are left, when enabled in the homeserver config. | ||
|
||
if membership is not None and membership not in [ | ||
Membership.LEAVE, | ||
Membership.BAN, | ||
]: | ||
raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) | ||
For the purposes of this feature, kicks, bans and "leaves" via state resolution | ||
weirdness are all considered to be leaves. | ||
|
||
# In normal case this call is only required if `membership` is not `None`. | ||
# But: After the last member had left the room, the background update | ||
# `_background_remove_left_rooms` is deleting rows related to this room from | ||
# the table `current_state_events` and `get_current_state_events` is `None`. | ||
await self.store.forget(user_id, room_id) | ||
Derived from `StatsHandler` and `UserDirectoryHandler`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it's worth factoring out the duplicated structure. There is certainly a lot of faff involved in adding another one of these state delta-consumers. |
||
""" | ||
|
||
def __init__(self, hs: "HomeServer"): | ||
super().__init__(hs) | ||
|
||
self._hs = hs | ||
self._store = hs.get_datastores().main | ||
self._storage_controllers = hs.get_storage_controllers() | ||
self._clock = hs.get_clock() | ||
self._notifier = hs.get_notifier() | ||
self._room_member_handler = hs.get_room_member_handler() | ||
|
||
# The current position in the current_state_delta stream | ||
self.pos: Optional[int] = None | ||
|
||
# Guard to ensure we only process deltas one at a time | ||
self._is_processing = False | ||
|
||
if hs.config.worker.run_background_tasks: | ||
self._notifier.add_replication_callback(self.notify_new_event) | ||
|
||
# We kick this off to pick up outstanding work from before the last restart. | ||
self._clock.call_later(0, self.notify_new_event) | ||
|
||
def notify_new_event(self) -> None: | ||
"""Called when there may be more deltas to process""" | ||
if self._is_processing: | ||
return | ||
|
||
self._is_processing = True | ||
|
||
async def process() -> None: | ||
try: | ||
await self._unsafe_process() | ||
finally: | ||
self._is_processing = False | ||
|
||
run_as_background_process("room_forgetter.notify_new_event", process) | ||
|
||
async def _unsafe_process(self) -> None: | ||
# If self.pos is None then means we haven't fetched it from DB | ||
if self.pos is None: | ||
self.pos = await self._store.get_room_forgetter_stream_pos() | ||
room_max_stream_ordering = self._store.get_room_max_stream_ordering() | ||
if self.pos > room_max_stream_ordering: | ||
# apparently, we've processed more events than exist in the database! | ||
# this can happen if events are removed with history purge or similar. | ||
logger.warning( | ||
"Event stream ordering appears to have gone backwards (%i -> %i): " | ||
"rewinding room forgetter processor", | ||
self.pos, | ||
room_max_stream_ordering, | ||
) | ||
self.pos = room_max_stream_ordering | ||
|
||
if not self._hs.config.room.forget_on_leave: | ||
# Update the processing position, so that if the server admin turns the | ||
# feature on at a later date, we don't decide to forget every room that | ||
# has ever been left in the past. | ||
self.pos = self._store.get_room_max_stream_ordering() | ||
await self._store.update_room_forgetter_stream_pos(self.pos) | ||
return | ||
|
||
# Loop round handling deltas until we're up to date | ||
|
||
while True: | ||
# Be sure to read the max stream_ordering *before* checking if there are any outstanding | ||
# deltas, since there is otherwise a chance that we could miss updates which arrive | ||
# after we check the deltas. | ||
room_max_stream_ordering = self._store.get_room_max_stream_ordering() | ||
if self.pos == room_max_stream_ordering: | ||
break | ||
|
||
logger.debug( | ||
"Processing room forgetting %s->%s", self.pos, room_max_stream_ordering | ||
) | ||
( | ||
max_pos, | ||
deltas, | ||
) = await self._storage_controllers.state.get_current_state_deltas( | ||
self.pos, room_max_stream_ordering | ||
) | ||
|
||
logger.debug("Handling %d state deltas", len(deltas)) | ||
await self._handle_deltas(deltas) | ||
|
||
self.pos = max_pos | ||
|
||
# Expose current event processing position to prometheus | ||
event_processing_positions.labels("room_forgetter").set(max_pos) | ||
|
||
await self._store.update_room_forgetter_stream_pos(max_pos) | ||
|
||
async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: | ||
"""Called with the state deltas to process""" | ||
for delta in deltas: | ||
typ = delta["type"] | ||
state_key = delta["state_key"] | ||
room_id = delta["room_id"] | ||
event_id = delta["event_id"] | ||
prev_event_id = delta["prev_event_id"] | ||
|
||
if typ != EventTypes.Member: | ||
continue | ||
|
||
if not self._hs.is_mine_id(state_key): | ||
continue | ||
|
||
change = await self._get_key_change( | ||
prev_event_id, | ||
event_id, | ||
key_name="membership", | ||
public_value=Membership.JOIN, | ||
) | ||
is_leave = change is MatchChange.now_false | ||
|
||
if is_leave: | ||
try: | ||
await self._room_member_handler.forget( | ||
UserID.from_string(state_key), room_id | ||
) | ||
except SynapseError as e: | ||
if e.code == 400: | ||
# The user is back in the room. | ||
pass | ||
else: | ||
raise | ||
|
||
|
||
def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* Copyright 2023 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
CREATE TABLE room_forgetter_stream_pos ( | ||
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. | ||
stream_id BIGINT NOT NULL, | ||
CHECK (Lock='X') | ||
); | ||
|
||
INSERT INTO room_forgetter_stream_pos ( | ||
stream_id | ||
) SELECT COALESCE(MAX(stream_ordering), 0) from events; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this fix #4720 ?
We should update the issue description in that case or "Part of ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this does not purge rooms from the database at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part of #4720? Seems like a prerequisite step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is adjacent, but I see it as more optional than anything. #4720 sounds like it's scoped to the case where all users explicitly choose to forget a room themselves. This PR forces users to forget rooms, making the proposed GC in #4720 trigger more often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 Feels like a nice step to make the GC automatic (sane default)