Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Prefill more stream change caches. (#12372)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Apr 5, 2022
1 parent d666fc0 commit 66053b6
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 61 deletions.
1 change: 1 addition & 0 deletions changelog.d/12372.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce overhead of restarting synchrotrons.
25 changes: 2 additions & 23 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -33,8 +32,6 @@ def __init__(
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self.hs = hs

self._device_list_id_gen = SlavedIdTracker(
Expand All @@ -47,26 +44,8 @@ def __init__(
("device_lists_changes_in_room", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache", device_list_max
)

super().__init__(database, db_conn, hs)

def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
Expand Down
43 changes: 27 additions & 16 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2030,29 +2030,40 @@ def get_cache_dict(
max_value: int,
limit: int = 100000,
) -> Tuple[Dict[Any, int], int]:
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
# It doesn't really matter how many we get, the StreamChangeCache will
# do the right thing to ensure it respects the max size of cache.
sql = (
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
" WHERE %(stream)s > ? - %(limit)s"
" GROUP BY %(entity)s"
) % {
"table": table,
"entity": entity_column,
"stream": stream_column,
"limit": limit,
}
"""Gets roughly the last N changes in the given stream table as a
map from entity to the stream ID of the most recent change.
Also returns the minimum stream ID.
"""

# This may return many rows for the same entity, but the `limit` is only
# a suggestion so we don't care that much.
#
# Note: Some stream tables can have multiple rows with the same stream
# ID. Instead of handling this with complicated SQL, we instead simply
# add one to the returned minimum stream ID to ensure correctness.
sql = f"""
SELECT {entity_column}, {stream_column}
FROM {table}
ORDER BY {stream_column} DESC
LIMIT ?
"""

txn = db_conn.cursor(txn_name="get_cache_dict")
txn.execute(sql, (int(max_value),))
txn.execute(sql, (limit,))

cache = {row[0]: int(row[1]) for row in txn}
# The rows come out in reverse stream ID order, so we want to keep the
# stream ID of the first row for each entity.
cache: Dict[Any, int] = {}
for row in txn:
cache.setdefault(row[0], int(row[1]))

txn.close()

if cache:
min_val = min(cache.values())
# We add one here as we don't know if we have all rows for the
# minimum stream ID.
min_val = min(cache.values()) + 1
else:
min_val = max_value

Expand Down
21 changes: 0 additions & 21 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,6 @@ def __init__(

super().__init__(database, db_conn, hs)

device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache", device_list_max
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down
50 changes: 50 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr

Expand All @@ -71,6 +72,55 @@ def __init__(
):
super().__init__(database, db_conn, hs)

device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=10000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)

(
user_signature_stream_prefill,
user_signature_stream_list_id,
) = self.db_pool.get_cache_dict(
db_conn,
"user_signature_stream",
entity_column="from_user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache",
user_signature_stream_list_id,
prefilled_cache=user_signature_stream_prefill,
)

(
device_list_federation_prefill,
device_list_federation_list_id,
) = self.db_pool.get_cache_dict(
db_conn,
"device_lists_outbound_pokes",
entity_column="destination",
stream_column="stream_id",
max_value=device_list_max,
limit=10000,
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache",
device_list_federation_list_id,
prefilled_cache=device_list_federation_prefill,
)

if hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,19 @@ def __init__(

super().__init__(database, db_conn, hs)

max_receipts_stream_id = self.get_max_receipt_stream_id()
receipts_stream_prefill, min_receipts_stream_id = self.db_pool.get_cache_dict(
db_conn,
"receipts_linearized",
entity_column="room_id",
stream_column="stream_id",
max_value=max_receipts_stream_id,
limit=10000,
)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache", self.get_max_receipt_stream_id()
"ReceiptsRoomChangeCache",
min_receipts_stream_id,
prefilled_cache=receipts_stream_prefill,
)

def get_max_receipt_stream_id(self) -> int:
Expand Down

0 comments on commit 66053b6

Please sign in to comment.