Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5706 from matrix-org/erikj/add_memberships_to_cur…
Browse files Browse the repository at this point in the history
…rent_state
  • Loading branch information
anoadragon453 committed Feb 19, 2020
2 parents 11d91ee + 5c07c97 commit 04d4c31
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 21 deletions.
1 change: 1 addition & 0 deletions changelog.d/5706.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.
18 changes: 16 additions & 2 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,21 @@
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
method."""
method.
Args:
txn: The database transcation object to wrap.
name (str): The name of this transactions for logging.
database_engine (Sqlite3Engine|PostgresEngine)
after_callbacks(list|None): A list that callbacks will be appended to
that have been added by `call_after` which should be run on
successful completion of the transaction. None indicates that no
callbacks should be allowed to be scheduled to run.
exception_callbacks(list|None): A list that callbacks will be appended
to that have been added by `call_on_exception` which should be run
if transaction ends with an error. None indicates that no callbacks
should be allowed to be scheduled to run.
"""

__slots__ = [
"txn",
Expand All @@ -97,7 +111,7 @@ class LoggingTransaction(object):
]

def __init__(
self, txn, name, database_engine, after_callbacks, exception_callbacks
self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None
):
object.__setattr__(self, "txn", txn)
object.__setattr__(self, "name", name)
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ def __init__(self, db_conn, hs):
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
after_callbacks=[],
exception_callbacks=[],
)
self._find_stream_orderings_for_times_txn(cur)
cur.close()
Expand Down
97 changes: 80 additions & 17 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -57,6 +59,47 @@


class RoomMemberWorkerStore(EventsWorkerStore):
def __init__(self, db_conn, hs):
super(RoomMemberWorkerStore, self).__init__(db_conn, hs)

# Is the current_state_events.membership up to date? Or is the
# background update still running?
self._current_state_events_membership_up_to_date = False

txn = LoggingTransaction(
db_conn.cursor(),
name="_check_safe_current_state_events_membership_updated",
database_engine=self.database_engine,
)
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
"""

pending_update = self._simple_select_one_txn(
txn,
table="background_updates",
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
retcols=["update_name"],
allow_none=True,
)

self._current_state_events_membership_up_to_date = not pending_update

# If the update is still running, reschedule to run.
if pending_update:
self._clock.call_later(
15.0,
run_as_background_process,
"_check_safe_current_state_events_membership_updated",
self.runInteraction,
"_check_safe_current_state_events_membership_updated",
self._check_safe_current_state_events_membership_updated_txn,
)

@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
def get_hosts_in_room(self, room_id, cache_context):
"""Returns the set of all hosts currently in the room
Expand All @@ -70,14 +113,23 @@ def get_hosts_in_room(self, room_id, cache_context):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
"SELECT m.user_id FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id "
" AND m.room_id = c.room_id "
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""

txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
Expand All @@ -99,15 +151,26 @@ def _get_room_summary_txn(txn):
# first get counts.
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
GROUP BY membership
"""
else:
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

txn.execute(sql, (room_id,))
res = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
-- We add membership to current state so that we don't need to join against
-- room_memberships, which can be surprisingly costly (we do such queries
-- very frequently).
-- This will be null for non-membership events and the content.membership key
-- for membership events. (Will also be null for membership events until the
-- background update job has finished).
ALTER TABLE current_state_events ADD membership TEXT;

INSERT INTO background_updates (update_name, progress_json) VALUES
Expand Down

0 comments on commit 04d4c31

Please sign in to comment.