Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add DeferredCache.get_immediate method (#8568)
Browse files Browse the repository at this point in the history
* Add `DeferredCache.get_immediate` method

A bunch of things that are currently calling `DeferredCache.get` are only
really interested in the result if it's completed. We can optimise and simplify
this case.

* Remove unused 'default' parameter to DeferredCache.get()

* another get_immediate instance
  • Loading branch information
richvdh authored Oct 19, 2020
1 parent c356b4b commit 903d11c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 27 deletions.
1 change: 1 addition & 0 deletions changelog.d/8568.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `get_immediate` method to `DeferredCache`.
2 changes: 1 addition & 1 deletion synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,6 @@ class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
# dedupe when we add callbacks to lru cache nodes, otherwise the number
# of callbacks would grow.
def __call__(self):
rules = self.cache.get(self.room_id, None, update_metrics=False)
rules = self.cache.get_immediate(self.room_id, None, update_metrics=False)
if rules:
rules.invalidate_all()
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ async def add_pusher(
lock=False,
)

user_has_pusher = self.get_if_user_has_pusher.cache.get(
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
(user_id,), None, update_metrics=False
)

Expand Down
11 changes: 1 addition & 10 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache

Expand Down Expand Up @@ -413,18 +412,10 @@ def _invalidate_get_users_with_receipts_in_room(
if receipt_type != "m.read":
return

# Returns either an ObservableDeferred or the raw result
res = self.get_users_with_read_receipts_in_room.cache.get(
res = self.get_users_with_read_receipts_in_room.cache.get_immediate(
room_id, None, update_metrics=False
)

# first handle the ObservableDeferred case
if isinstance(res, ObservableDeferred):
if res.has_called():
res = res.get_result()
else:
res = None

if res and user_id in res:
# We'd only be adding to the set, so no point invalidating if the
# user is already there
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ async def _get_joined_users_from_context(
# If we do then we can reuse that result and simply update it with
# any membership changes in `delta_ids`
if context.prev_group and context.delta_ids:
prev_res = self._get_joined_users_from_context.cache.get(
prev_res = self._get_joined_users_from_context.cache.get_immediate(
(room_id, context.prev_group), None
)
if prev_res and isinstance(prev_res, dict):
Expand Down
35 changes: 25 additions & 10 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

import enum
import threading
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)

from prometheus_client import Gauge

Expand All @@ -33,7 +42,7 @@
["name"],
)


T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")

Expand Down Expand Up @@ -119,21 +128,21 @@ def check_thread(self):
def get(
self,
key: KT,
default=_Sentinel.sentinel,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
):
) -> Union[ObservableDeferred, VT]:
"""Looks the key up in the caches.
Args:
key(tuple)
default: What is returned if key is not in the caches. If not
specified then function throws KeyError instead
callback(fn): Gets called when the entry in the cache is invalidated
update_metrics (bool): whether to update the cache hit rate metrics
Returns:
Either an ObservableDeferred or the result itself
Raises:
KeyError if the key is not found in the cache
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
Expand All @@ -145,13 +154,19 @@ def get(
m.inc_hits()
return val.deferred

val = self.cache.get(
key, default, callbacks=callbacks, update_metrics=update_metrics
val2 = self.cache.get(
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
)
if val is _Sentinel.sentinel:
if val2 is _Sentinel.sentinel:
raise KeyError()
else:
return val
return val2

def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
) -> Union[VT, T]:
"""If we have a *completed* cached value, return it."""
return self.cache.get(key, default, update_metrics=update_metrics)

def set(
self,
Expand Down
27 changes: 23 additions & 4 deletions tests/util/caches/test_deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def test_hit(self):

self.assertEquals(cache.get("foo"), 123)

def test_get_immediate(self):
cache = DeferredCache("test")
d1 = defer.Deferred()
cache.set("key1", d1)

# get_immediate should return default
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 1)

# now complete the set
d1.callback(2)

# get_immediate should return result
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 2)

def test_invalidate(self):
cache = DeferredCache("test")
cache.prefill(("foo",), 123)
Expand Down Expand Up @@ -80,17 +96,20 @@ def record_callback(idx):
# now do the invalidation
cache.invalidate_all()

# lookup should return none
self.assertIsNone(cache.get("key1", None))
self.assertIsNone(cache.get("key2", None))
# lookup should fail
with self.assertRaises(KeyError):
cache.get("key1")
with self.assertRaises(KeyError):
cache.get("key2")

# both callbacks should have been callbacked
self.assertTrue(callback_record[0], "Invalidation callback for key1 not called")
self.assertTrue(callback_record[1], "Invalidation callback for key2 not called")

# letting the other lookup complete should do nothing
d1.callback("result1")
self.assertIsNone(cache.get("key1", None))
with self.assertRaises(KeyError):
cache.get("key1", None)

def test_eviction(self):
cache = DeferredCache(
Expand Down

0 comments on commit 903d11c

Please sign in to comment.