This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add membership column to current_state_events table #5706
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
6de09e0
Add membership column to current_state_events table.
erikjohnston c618a5d
Add background update for current_state_events.membership column
erikjohnston 059d8c1
Track if current_state_events.membership is up to date
erikjohnston 8e1ada9
Use the current_state_events.membership column
erikjohnston 89c8859
Newsfile
erikjohnston ebc5ed1
Update comment for new column
erikjohnston bd2e1a2
LoggingTransaction accepts None for callback lists.
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Reduce database IO usage by optimising queries for current membership. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ | |
from twisted.internet import defer | ||
|
||
from synapse.api.constants import EventTypes, Membership | ||
from synapse.metrics.background_process_metrics import run_as_background_process | ||
from synapse.storage._base import LoggingTransaction | ||
from synapse.storage.events_worker import EventsWorkerStore | ||
from synapse.types import get_domain_from_id | ||
from synapse.util.async_helpers import Linearizer | ||
|
@@ -53,9 +55,51 @@ | |
MemberSummary = namedtuple("MemberSummary", ("members", "count")) | ||
|
||
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" | ||
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" | ||
|
||
|
||
class RoomMemberWorkerStore(EventsWorkerStore): | ||
def __init__(self, db_conn, hs): | ||
super(RoomMemberWorkerStore, self).__init__(db_conn, hs) | ||
|
||
# Is the current_state_events.membership up to date? Or is the | ||
# background update still running? | ||
self._current_state_events_membership_up_to_date = False | ||
|
||
txn = LoggingTransaction( | ||
db_conn.cursor(), | ||
name="_check_safe_current_state_events_membership_updated", | ||
database_engine=self.database_engine, | ||
) | ||
self._check_safe_current_state_events_membership_updated_txn(txn) | ||
txn.close() | ||
|
||
def _check_safe_current_state_events_membership_updated_txn(self, txn): | ||
"""Checks if it is safe to assume the new current_state_events | ||
membership column is up to date | ||
""" | ||
|
||
pending_update = self._simple_select_one_txn( | ||
txn, | ||
table="background_updates", | ||
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME}, | ||
retcols=["update_name"], | ||
allow_none=True, | ||
) | ||
|
||
self._current_state_events_membership_up_to_date = not pending_update | ||
|
||
# If the update is still running, reschedule to run. | ||
if pending_update: | ||
self._clock.call_later( | ||
15.0, | ||
run_as_background_process, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. \o/ |
||
"_check_safe_current_state_events_membership_updated", | ||
self.runInteraction, | ||
"_check_safe_current_state_events_membership_updated", | ||
self._check_safe_current_state_events_membership_updated_txn, | ||
) | ||
|
||
@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True) | ||
def get_hosts_in_room(self, room_id, cache_context): | ||
"""Returns the set of all hosts currently in the room | ||
|
@@ -69,14 +113,23 @@ def get_hosts_in_room(self, room_id, cache_context): | |
@cached(max_entries=100000, iterable=True) | ||
def get_users_in_room(self, room_id): | ||
def f(txn): | ||
sql = ( | ||
"SELECT m.user_id FROM room_memberships as m" | ||
" INNER JOIN current_state_events as c" | ||
" ON m.event_id = c.event_id " | ||
" AND m.room_id = c.room_id " | ||
" AND m.user_id = c.state_key" | ||
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" | ||
) | ||
# If we can assume current_state_events.membership is up to date | ||
# then we can avoid a join, which is a Very Good Thing given how | ||
# frequently this function gets called. | ||
if self._current_state_events_membership_up_to_date: | ||
sql = """ | ||
SELECT state_key FROM current_state_events | ||
WHERE type = 'm.room.member' AND room_id = ? AND membership = ? | ||
""" | ||
else: | ||
sql = """ | ||
SELECT state_key FROM room_memberships as m | ||
INNER JOIN current_state_events as c | ||
ON m.event_id = c.event_id | ||
AND m.room_id = c.room_id | ||
AND m.user_id = c.state_key | ||
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? | ||
""" | ||
|
||
txn.execute(sql, (room_id, Membership.JOIN)) | ||
return [to_ascii(r[0]) for r in txn] | ||
|
@@ -98,15 +151,26 @@ def _get_room_summary_txn(txn): | |
# first get counts. | ||
# We do this all in one transaction to keep the cache small. | ||
# FIXME: get rid of this when we have room_stats | ||
sql = """ | ||
SELECT count(*), m.membership FROM room_memberships as m | ||
INNER JOIN current_state_events as c | ||
ON m.event_id = c.event_id | ||
AND m.room_id = c.room_id | ||
AND m.user_id = c.state_key | ||
WHERE c.type = 'm.room.member' AND c.room_id = ? | ||
GROUP BY m.membership | ||
""" | ||
|
||
# If we can assume current_state_events.membership is up to date | ||
# then we can avoid a join, which is a Very Good Thing given how | ||
# frequently this function gets called. | ||
if self._current_state_events_membership_up_to_date: | ||
sql = """ | ||
SELECT count(*), membership FROM current_state_events | ||
WHERE type = 'm.room.member' AND room_id = ? | ||
GROUP BY membership | ||
""" | ||
else: | ||
sql = """ | ||
SELECT count(*), m.membership FROM room_memberships as m | ||
INNER JOIN current_state_events as c | ||
ON m.event_id = c.event_id | ||
AND m.room_id = c.room_id | ||
AND m.user_id = c.state_key | ||
WHERE c.type = 'm.room.member' AND c.room_id = ? | ||
GROUP BY m.membership | ||
""" | ||
|
||
txn.execute(sql, (room_id,)) | ||
res = {} | ||
|
@@ -224,7 +288,7 @@ def _get_rooms_for_user_where_membership_is_txn( | |
results = [] | ||
if membership_list: | ||
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( | ||
" OR ".join(["membership = ?" for _ in membership_list]), | ||
" OR ".join(["m.membership = ?" for _ in membership_list]), | ||
) | ||
|
||
args = [user_id] | ||
|
@@ -453,8 +517,8 @@ def is_host_joined(self, room_id, host): | |
|
||
sql = """ | ||
SELECT state_key FROM current_state_events AS c | ||
INNER JOIN room_memberships USING (event_id) | ||
WHERE membership = 'join' | ||
INNER JOIN room_memberships AS m USING (event_id) | ||
WHERE m.membership = 'join' | ||
AND type = 'm.room.member' | ||
AND c.room_id = ? | ||
AND state_key LIKE ? | ||
|
@@ -602,6 +666,10 @@ def __init__(self, db_conn, hs): | |
self.register_background_update_handler( | ||
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile | ||
) | ||
self.register_background_update_handler( | ||
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, | ||
self._background_current_state_membership, | ||
) | ||
|
||
def _store_room_members_txn(self, txn, events, backfilled): | ||
"""Store a room member in the database. | ||
|
@@ -781,6 +849,52 @@ def add_membership_profile_txn(txn): | |
|
||
defer.returnValue(result) | ||
|
||
@defer.inlineCallbacks | ||
def _background_current_state_membership(self, progress, batch_size): | ||
"""Update the new membership column on current_state_events. | ||
""" | ||
|
||
if "rooms" not in progress: | ||
rooms = yield self._simple_select_onecol( | ||
table="current_state_events", | ||
keyvalues={}, | ||
retcol="DISTINCT room_id", | ||
desc="_background_current_state_membership_get_rooms", | ||
) | ||
progress["rooms"] = rooms | ||
|
||
rooms = progress["rooms"] | ||
|
||
def _background_current_state_membership_txn(txn): | ||
processed = 0 | ||
while rooms and processed < batch_size: | ||
sql = """ | ||
UPDATE current_state_events AS c | ||
SET membership = ( | ||
SELECT membership FROM room_memberships | ||
WHERE event_id = c.event_id | ||
) | ||
WHERE room_id = ? | ||
""" | ||
txn.execute(sql, (rooms.pop(),)) | ||
processed += txn.rowcount | ||
|
||
self._background_update_progress_txn( | ||
txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress | ||
) | ||
|
||
return processed | ||
|
||
result = yield self.runInteraction( | ||
"_background_current_state_membership_update", | ||
_background_current_state_membership_txn, | ||
) | ||
|
||
if not rooms: | ||
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) | ||
|
||
defer.returnValue(result) | ||
|
||
|
||
class _JoinedHostsCache(object): | ||
"""Cache for joined hosts in a room that is optimised to handle updates | ||
|
25 changes: 25 additions & 0 deletions
25
synapse/storage/schema/delta/56/current_state_events_membership.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* Copyright 2019 The Matrix.org Foundation C.I.C. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
-- We add membership to current state so that we don't need to join against | ||
-- room_memberships, which can be surprisingly costly (we do such queries | ||
-- very frequently). | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
-- This will be null for non-membership events and the content.membership key | ||
-- for membership events. (Will also be null for membership events until the | ||
-- background update job has finished). | ||
ALTER TABLE current_state_events ADD membership TEXT; | ||
|
||
INSERT INTO background_updates (update_name, progress_json) VALUES | ||
('current_state_events_membership', '{}'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the thinking behind the schema bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly that this stops people from rolling back, which will cause
membership
column to not be correctly updated.