Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prune old typing notifications #15332

Merged
merged 2 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15332.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in worker mode where on a rolling restart of workers the "typing" worker would consume 100% CPU until it got restarted.
25 changes: 25 additions & 0 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class RoomMember:
FEDERATION_PING_INTERVAL = 40 * 1000


# How long to remember a typing notification happened in a room before
# forgetting about it.
FORGET_TIMEOUT = 10 * 60 * 1000


class FollowerTypingHandler:
"""A typing handler on a different process than the writer that is updated
via replication.
Expand Down Expand Up @@ -83,7 +88,10 @@ def __init__(self, hs: "HomeServer"):
self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000)
self._latest_room_serial = 0

self._rooms_updated: Set[str] = set()

self.clock.looping_call(self._handle_timeouts, 5000)
self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT)

def _reset(self) -> None:
"""Reset the typing handler's data caches."""
Expand All @@ -92,6 +100,8 @@ def _reset(self) -> None:
# map room IDs to sets of users currently typing
self._room_typing = {}

self._rooms_updated = set()

self._member_last_federation_poke = {}
self.wheel_timer = WheelTimer(bucket_size=5000)

Expand Down Expand Up @@ -178,6 +188,7 @@ def process_replication_rows(
prev_typing = self._room_typing.get(row.room_id, set())
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = now_typing
self._rooms_updated.add(row.room_id)

if self.federation:
run_as_background_process(
Expand Down Expand Up @@ -209,6 +220,19 @@ async def _send_changes_in_typing_to_remotes(
def get_current_token(self) -> int:
return self._latest_room_serial

def _prune_old_typing(self) -> None:
"""Prune rooms that haven't seen typing updates since last time.

This is safe to do as clients should time out old typing notifications.
"""
stale_rooms = self._room_serials.keys() - self._rooms_updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know you could subtract from a dict_keys!


for room_id in stale_rooms:
self._room_serials.pop(room_id, None)
self._room_typing.pop(room_id, None)

self._rooms_updated = set()


class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -388,6 +412,7 @@ def _push_update_local(self, member: RoomMember, typing: bool) -> None:
self._typing_stream_change_cache.entity_has_changed(
member.room_id, self._latest_room_serial
)
self._rooms_updated.add(member.room_id)

self.notifier.on_new_event(
StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
Expand Down