Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Optimize how we calculate likely_domains during backfill (#13575)
Browse files Browse the repository at this point in the history
Optimize how we calculate `likely_domains` during backfill because I've seen this take 17s in production just to `get_current_state` which is used to `get_domains_from_state` (see case [*2. Loading tons of events* in the `/messages` investigation issue](#13356)).

There are 3 ways we currently calculate hosts that are in the room:

 1. `get_current_state` -> `get_domains_from_state`
    - Used in `backfill` to calculate `likely_domains` and `/timestamp_to_event` because it was cargo-culted from `backfill`
    - This one is being eliminated in favor of `get_current_hosts_in_room` in this PR 🕳
 1. `get_current_hosts_in_room`
    - Used for other federation things like sending read receipts and typing indicators
 1. `get_hosts_in_room_at_events`
    - Used when pushing out events over federation to other servers in the `_process_event_queue_loop`

Fix #13626

Part of #13356

Mentioned in [internal doc](https://docs.google.com/document/d/1lvUoVfYUiy6UaHB6Rb4HicjaJAU40-APue9Q4vzuW3c/edit#bookmark=id.2tvwz3yhcafh)


### Query performance

#### Before

The query from `get_current_state` sucks just because we have to get all 80k events. And we see almost the exact same performance locally trying to get all of these events (16s vs 17s):
```
synapse=# SELECT type, state_key, event_id FROM current_state_events WHERE room_id = '!OGEhHVWSdvArJzumhm:matrix.org';
Time: 16035.612 ms (00:16.036)

synapse=# SELECT type, state_key, event_id FROM current_state_events WHERE room_id = '!OGEhHVWSdvArJzumhm:matrix.org';
Time: 4243.237 ms (00:04.243)
```

But what about `get_current_hosts_in_room`: When there is 8M rows in the `current_state_events` table, the previous query in `get_current_hosts_in_room` took 13s from complete freshness (when the events were first added). But takes 930ms after a Postgres restart or 390ms if running back to back to back.

```sh
$ psql synapse
synapse=# \timing on
synapse=# SELECT COUNT(DISTINCT substring(state_key FROM '@[^:]*:(.*)$'))
FROM current_state_events
WHERE
    type = 'm.room.member'
    AND membership = 'join'
    AND room_id = '!OGEhHVWSdvArJzumhm:matrix.org';
 count
-------
  4130
(1 row)

Time: 13181.598 ms (00:13.182)

synapse=# SELECT COUNT(*) from current_state_events where room_id = '!OGEhHVWSdvArJzumhm:matrix.org';
 count
-------
 80814

synapse=# SELECT COUNT(*) from current_state_events;
  count
---------
 8162847

synapse=# SELECT pg_size_pretty( pg_total_relation_size('current_state_events') );
 pg_size_pretty
----------------
 4702 MB
```

#### After

I'm not sure how long it takes from complete freshness as I only really get that opportunity once (maybe restarting computer but that's cumbersome) and it's not really relevant to normal operating times. Maybe you get closer to the fresh times the more access variability there is so that Postgres caches aren't as exact. Update: The longest I've seen this run for is 6.4s and 4.5s after a computer restart.

After a Postgres restart, it takes 330ms and running back to back takes 260ms.

```sh
$ psql synapse
synapse=# \timing on
Timing is on.
synapse=# SELECT
    substring(c.state_key FROM '@[^:]*:(.*)$') as host
FROM current_state_events c
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE
    c.type = 'm.room.member'
    AND c.membership = 'join'
    AND c.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'
GROUP BY host
ORDER BY min(e.depth) ASC;
Time: 333.800 ms
```

#### Going further

To improve things further we could add a `limit` parameter to `get_current_hosts_in_room`. Realistically, we don't need 4k domains to choose from because there is no way we're going to query that many before we a) probably get an answer or b) we give up. 

Another thing we can do is optimize the query to use a index skip scan:

 - https://wiki.postgresql.org/wiki/Loose_indexscan
 - Index Skip Scan, https://commitfest.postgresql.org/37/1741/
 - https://www.timescale.com/blog/how-we-made-distinct-queries-up-to-8000x-faster-on-postgresql/
  • Loading branch information
MadLittleMods authored Aug 30, 2022
1 parent 4f6de33 commit 51d732d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 70 deletions.
1 change: 1 addition & 0 deletions changelog.d/13575.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimize how Synapse calculates domains to fetch from during backfill.
53 changes: 10 additions & 43 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -104,37 +104,6 @@
)


def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state
Args:
state: State map from type/state key to event.
Returns:
Returns a list of servers with the lowest depth of their joins.
Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]

joined_domains: Dict[str, int] = {}
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except Exception:
pass

return sorted(joined_domains.items(), key=lambda d: d[1])


class _BackfillPointType(Enum):
# a regular backwards extremity (ie, an event which we don't yet have, but which
# is referred to by other events in the DAG)
Expand Down Expand Up @@ -432,21 +401,19 @@ async def _maybe_backfill_inner(
)

# Now we need to decide which hosts to hit first.

# First we try hosts that are already in the room
# First we try hosts that are already in the room.
# TODO: HEURISTIC ALERT.
likely_domains = (
await self._storage_controllers.state.get_current_hosts_in_room(room_id)
)

curr_state = await self._storage_controllers.state.get_current_state(room_id)

curr_domains = get_domains_from_state(curr_state)

likely_domains = [
domain for domain, depth in curr_domains if domain != self.server_name
]

async def try_backfill(domains: List[str]) -> bool:
async def try_backfill(domains: Collection[str]) -> bool:
# TODO: Should we try multiple of these at a time?
for dom in domains:
# We don't want to ask our own server for information we don't have
if dom == self.server_name:
continue

try:
await self._federation_event_handler.backfill(
dom, room_id, limit=100, extremities=extremities_to_request
Expand Down
14 changes: 6 additions & 8 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
from synapse.events import EventBase
from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.federation import get_domains_from_state
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
Expand Down Expand Up @@ -1462,17 +1461,16 @@ async def get_event_for_timestamp(
timestamp,
)

# Find other homeservers from the given state in the room
curr_state = await self._storage_controllers.state.get_current_state(
room_id
likely_domains = (
await self._storage_controllers.state.get_current_hosts_in_room(room_id)
)
curr_domains = get_domains_from_state(curr_state)
likely_domains = [
domain for domain, depth in curr_domains if domain != self.server_name
]

# Loop through each homeserver candidate until we get a succesful response
for domain in likely_domains:
# We don't want to ask our own server for information we don't have
if domain == self.server_name:
continue

try:
remote_response = await self.federation_client.timestamp_to_event(
domain, room_id, timestamp, direction
Expand Down
3 changes: 1 addition & 2 deletions synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
List,
Mapping,
Optional,
Set,
Tuple,
)

Expand Down Expand Up @@ -520,7 +519,7 @@ async def get_current_state_event(
)
return state_map.get(key)

async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state."""

await self._partial_state_room_tracker.await_full_state(room_id)
Expand Down
88 changes: 71 additions & 17 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,48 @@ def _check_safe_current_state_events_membership_updated_txn(

@cached(max_entries=100000, iterable=True)
async def get_users_in_room(self, room_id: str) -> List[str]:
"""
Returns a list of users in the room sorted by longest in the room first
(aka. with the lowest depth). This is done to match the sort in
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
"""

return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)

def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[str]:
"""
Returns a list of users in the room sorted by longest in the room first
(aka. with the lowest depth). This is done to match the sort in
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
"""
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
SELECT c.state_key FROM current_state_events as c
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
/* Sorted by lowest depth first */
ORDER BY e.depth ASC;
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
SELECT c.state_key FROM room_memberships as m
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
/* Sorted by lowest depth first */
ORDER BY e.depth ASC;
"""

txn.execute(sql, (room_id, Membership.JOIN))
Expand Down Expand Up @@ -1037,37 +1058,70 @@ async def _check_host_room_membership(
return True

@cached(iterable=True, max_entries=10000)
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
"""Get current hosts in room based on current state."""
async def get_current_hosts_in_room(self, room_id: str) -> List[str]:
"""
Get current hosts in room based on current state.
The heuristic of sorting by servers who have been in the room the
longest is good because they're most likely to have anything we ask
about.
Returns:
Returns a list of servers sorted by longest in the room first. (aka.
sorted by join with the lowest depth first).
"""

# First we check if we already have `get_users_in_room` in the cache, as
# we can just calculate result from that
users = self.get_users_in_room.cache.get_immediate(
(room_id,), None, update_metrics=False
)
if users is not None:
return {get_domain_from_id(u) for u in users}

if isinstance(self.database_engine, Sqlite3Engine):
if users is None and isinstance(self.database_engine, Sqlite3Engine):
# If we're using SQLite then let's just always use
# `get_users_in_room` rather than funky SQL.
users = await self.get_users_in_room(room_id)
return {get_domain_from_id(u) for u in users}

if users is not None:
# Because `users` is sorted from lowest -> highest depth, the list
# of domains will also be sorted that way.
domains: List[str] = []
# We use a `Set` just for fast lookups
domain_set: Set[str] = set()
for u in users:
domain = get_domain_from_id(u)
if domain not in domain_set:
domain_set.add(domain)
domains.append(domain)
return domains

# For PostgreSQL we can use a regex to pull out the domains from the
# joined users in `current_state_events` via regex.

def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]:
def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> List[str]:
# Returns a list of servers currently joined in the room sorted by
# longest in the room first (aka. with the lowest depth). The
# heuristic of sorting by servers who have been in the room the
# longest is good because they're most likely to have anything we
# ask about.
sql = """
SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$')
FROM current_state_events
SELECT
/* Match the domain part of the MXID */
substring(c.state_key FROM '@[^:]*:(.*)$') as server_domain
FROM current_state_events c
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE
type = 'm.room.member'
AND membership = 'join'
AND room_id = ?
/* Find any join state events in the room */
c.type = 'm.room.member'
AND c.membership = 'join'
AND c.room_id = ?
/* Group all state events from the same domain into their own buckets (groups) */
GROUP BY server_domain
/* Sorted by lowest depth first */
ORDER BY min(e.depth) ASC;
"""
txn.execute(sql, (room_id,))
return {d for d, in txn}
return [d for d, in txn]

return await self.db_pool.runInteraction(
"get_current_hosts_in_room", get_current_hosts_in_room_txn
Expand Down

0 comments on commit 51d732d

Please sign in to comment.