Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add bulk lookup API to DeferredCache.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Aug 23, 2022
1 parent 0e237d1 commit 90c76f0
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 32 deletions.
75 changes: 70 additions & 5 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
import enum
import threading
from typing import (
Any,
Callable,
Collection,
Dict,
Generic,
Iterable,
MutableMapping,
Optional,
Set,
Sized,
Tuple,
TypeVar,
Union,
cast,
Expand All @@ -38,10 +37,9 @@
from twisted.internet import defer
from twisted.python.failure import Failure

from synapse.logging.context import PreserveLoggingContext
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.caches.treecache import TreeCache

cache_pending_metric = Gauge(
"synapse_util_caches_cache_pending",
Expand Down Expand Up @@ -99,7 +97,7 @@ def __init__(

# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache: Union[
TreeCache, "MutableMapping[KT, CacheEntry]"
TreeCache, "MutableMapping[KT, CacheEntry[KT, VT]]"
] = cache_type()

def metrics_cb() -> None:
Expand Down Expand Up @@ -183,6 +181,73 @@ def get(
else:
return defer.succeed(val2)

def get_bulk(
self,
keys: Collection[KT],
callback: Optional[Callable[[], None]] = None,
) -> Tuple[Dict[KT, VT], Optional["defer.Deferred[Dict[KT, VT]]"], Collection[KT]]:
"""Bulk lookup of items in the cache.
Returns:
A 3-tuple of:
1. a dict of key/value of items already cached;
2. a deferred that resolves to a dict of key/value of items
we're already fetching; and
3. a collection of keys that don't appear in the previous two.
"""

# The cached results
cached = {}

# List of pending deferreds
pending = []

# Dict that gets filled out when the pending deferreds complete
pending_results = {}

# List of keys that aren't in either cache
missing = []

callbacks = (callback,) if callback else ()

for key in keys:
# Check if its in the main cache.
immediate_value = self.cache.get(
key,
_Sentinel.sentinel,
callbacks=callbacks,
)
if immediate_value is not _Sentinel.sentinel:
cached[key] = immediate_value
continue

# Check if its in the pending cache
pending_value = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
if pending_value is not _Sentinel.sentinel:
pending_value.add_callback(key, callback)

def completed_cb(value: VT, key: KT) -> VT:
pending_results[key] = value
return value

# Add a callback to fill out `pending_results` when that completes
d = pending_value.deferred(key).addCallback(completed_cb, key)
pending.append(d)
continue

# Not in either cache
missing.append(key)

# If we've got pending deferreds, squash them into a single one that
# returns `pending_results`.
pending_deferred = None
if pending:
pending_deferred = defer.gatherResults(
pending, consumeErrors=True
).addCallback(lambda _: pending_results)

return (cached, pending_deferred, missing)

def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
) -> Union[VT, T]:
Expand Down
59 changes: 32 additions & 27 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Generic,
Hashable,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -435,60 +436,64 @@ def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
list_args = arg_dict[self.list_name]

results = {}

def update_results_dict(res: Any, arg: Hashable) -> None:
results[arg] = res

# list of deferreds to wait for
cached_defers = []

missing = set()

# If the cache takes a single arg then that is used as the key,
# otherwise a tuple is used.
if num_args == 1:

def arg_to_cache_key(arg: Hashable) -> Hashable:
return arg

def cache_key_to_arg(key: tuple) -> Hashable:
return key

else:
keylist = list(keyargs)

def arg_to_cache_key(arg: Hashable) -> Hashable:
keylist[self.list_pos] = arg
return tuple(keylist)

for arg in list_args:
try:
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
if not res.called:
res.addCallback(update_results_dict, arg)
cached_defers.append(res)
else:
results[arg] = res.result
except KeyError:
missing.add(arg)
def cache_key_to_arg(key: tuple) -> Hashable:
return key[self.list_pos]

cache_keys = [arg_to_cache_key(arg) for arg in list_args]
immediate_results, pending_deferred, missing = cache.get_bulk(
cache_keys, callback=invalidate_callback
)

results = {cache_key_to_arg(key): v for key, v in immediate_results.items()}

cached_defers: List["defer.Deferred[Any]"] = []
if pending_deferred:

def update_results(r: Dict) -> None:
for k, v in r.items():
results[cache_key_to_arg(k)] = v

pending_deferred.addCallback(update_results)
cached_defers.append(pending_deferred)

if missing:
cache_keys = [arg_to_cache_key(key) for key in missing]
cache_entry = cache.set_bulk(cache_keys, callback=invalidate_callback)
cache_entry = cache.set_bulk(missing, invalidate_callback)

def complete_all(res: Dict[Hashable, Any]) -> None:
missing_results = {}
for key in missing:
val = res.get(key, None)
arg = cache_key_to_arg(key)
val = res.get(arg, None)

results[key] = val
missing_results[arg_to_cache_key(key)] = val
results[arg] = val
missing_results[key] = val

cache_entry.complete_bulk(cache, missing_results)

def errback_all(f: Failure) -> None:
cache_entry.error_bulk(cache, cache_keys, f)
cache_entry.error_bulk(cache, missing, f)

args_to_call = dict(arg_dict)
args_to_call[self.list_name] = missing
args_to_call[self.list_name] = {
cache_key_to_arg(key) for key in missing
}

# dispatch the call, and attach the two handlers
missing_d = defer.maybeDeferred(
Expand Down

0 comments on commit 90c76f0

Please sign in to comment.