diff --git a/changelog.d/6749.misc b/changelog.d/6749.misc new file mode 100644 index 000000000000..9fa13cb1d4fa --- /dev/null +++ b/changelog.d/6749.misc @@ -0,0 +1 @@ +Allow streaming cache 'invalidate all' to workers. diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index a0b1d563ff5a..e3a4634b1407 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -254,6 +254,11 @@ and they key to invalidate. For example: > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251] +Alternatively, an entire cache can be invalidated by sending down a `null` +instead of the key. For example: + + > RDATA caches 550953772 ["get_user_by_id", null, 1550574873252] + However, there are times when a number of caches need to be invalidated at the same time with the same key. To reduce traffic we batch those invalidations into a single poke by defining a special cache name that diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 704282c8008c..f45cbd37a0f5 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -66,11 +66,16 @@ def process_replication_rows(self, stream_name, token, rows): self._cache_id_gen.advance(token) for row in rows: if row.cache_func == CURRENT_STATE_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for current state cache" + ) + room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) else: - self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys)) + self._attempt_to_invalidate_cache(row.cache_func, row.keys) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index e03e77199b61..a8d568b14a86 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -17,7 +17,9 @@ import itertools import logging from collections import namedtuple -from typing import Any +from typing import Any, List, Optional + +import attr logger = logging.getLogger(__name__) @@ -65,10 +67,24 @@ "PushersStreamRow", ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool ) -CachesStreamRow = namedtuple( - "CachesStreamRow", - ("cache_func", "keys", "invalidation_ts"), # str # list(str) # int -) + + +@attr.s +class CachesStreamRow: + """Stream to inform workers they should invalidate their cache. + + Attributes: + cache_func: Name of the cached function. + keys: The entry in the cache to invalidate. If None then will + invalidate all. + invalidation_ts: Timestamp of when the invalidation took place. + """ + + cache_func = attr.ib(type=str) + keys = attr.ib(type=Optional[List[Any]]) + invalidation_ts = attr.ib(type=int) + + PublicRoomsStreamRow = namedtuple( "PublicRoomsStreamRow", ( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3bb938166349..da3b99f93dc9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging import random from abc import ABCMeta +from typing import Any, Optional from six import PY2 from six.moves import builtins @@ -26,7 +27,7 @@ from synapse.storage.database import LoggingTransaction # noqa: F401 from synapse.storage.database import make_in_list_sql_clause # noqa: F401 from synapse.storage.database import Database -from synapse.types import get_domain_from_id +from synapse.types import Collection, get_domain_from_id logger = logging.getLogger(__name__) @@ -63,17 +64,24 @@ def _invalidate_state_caches(self, room_id, members_changed): self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) - def _attempt_to_invalidate_cache(self, cache_name, key): + def _attempt_to_invalidate_cache( + self, cache_name: str, key: Optional[Collection[Any]] + ): """Attempts to invalidate the cache of the given name, ignoring if the cache doesn't exist. Mainly used for invalidating caches on workers, where they may not have the cache. Args: - cache_name (str) - key (tuple) + cache_name + key: Entry to invalidate. If None then invalidates the entire + cache. """ + try: - getattr(self, cache_name).invalidate(key) + if key is None: + getattr(self, cache_name).invalidate_all() + else: + getattr(self, cache_name).invalidate(tuple(key)) except AttributeError: # We probably haven't pulled in the cache in this worker, # which is fine. diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index bf91512daf99..afa2b41c989a 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -16,6 +16,7 @@ import itertools import logging +from typing import Any, Iterable, Optional from twisted.internet import defer @@ -43,6 +44,14 @@ def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) self._send_invalidation_to_replication(txn, cache_func.__name__, keys) + def _invalidate_all_cache_and_stream(self, txn, cache_func): + """Invalidates the entire cache and adds it to the cache stream so slaves + will know to invalidate their caches. + """ + + txn.call_after(cache_func.invalidate_all) + self._send_invalidation_to_replication(txn, cache_func.__name__, None) + def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): """Special case invalidation of caches based on current state. @@ -73,17 +82,24 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): txn, CURRENT_STATE_CACHE_NAME, [room_id] ) - def _send_invalidation_to_replication(self, txn, cache_name, keys): + def _send_invalidation_to_replication( + self, txn, cache_name: str, keys: Optional[Iterable[Any]] + ): """Notifies replication that given cache has been invalidated. Note that this does *not* invalidate the cache locally. Args: txn - cache_name (str) - keys (iterable[str]) + cache_name + keys: Entry to invalidate. If None will invalidate all. """ + if cache_name == CURRENT_STATE_CACHE_NAME and keys is None: + raise Exception( + "Can't stream invalidate all with magic current state cache" + ) + if isinstance(self.database_engine, PostgresEngine): # get_next() returns a context manager which is designed to wrap # the transaction. However, we want to only get an ID when we want @@ -95,13 +111,16 @@ def _send_invalidation_to_replication(self, txn, cache_name, keys): txn.call_after(ctx.__exit__, None, None, None) txn.call_after(self.hs.get_notifier().on_new_replication_data) + if keys is not None: + keys = list(keys) + self.db.simple_insert_txn( txn, table="cache_invalidation_stream", values={ "stream_id": stream_id, "cache_func": cache_name, - "keys": list(keys), + "keys": keys, "invalidation_ts": self.clock.time_msec(), }, )