Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add support for multiple entities per stream id
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Apr 17, 2020
1 parent 5a2efc0 commit fa7bb6b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 46 deletions.
82 changes: 44 additions & 38 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import Collection, Dict, Iterable, Mapping, Optional, Set
from typing import Dict, Iterable, List, Mapping, Optional, Set

from six import integer_types

Expand Down Expand Up @@ -48,8 +48,8 @@ def __init__(
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {} # type: Dict[EntityType, int]

# map from stream id to the entity which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, EntityType]
# map from stream id to the a set of entities which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
Expand Down Expand Up @@ -92,16 +92,9 @@ def get_entities_changed(
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
changed_entities = {
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
}

result = changed_entities.intersection(entities)

changed_entities = self.get_all_entities_changed(stream_pos)
if changed_entities is not None:
result = set(changed_entities).intersection(entities)
self.metrics.inc_hits()
else:
result = set(entities)
Expand All @@ -115,7 +108,7 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
assert type(stream_pos) is int

if not self._cache:
# If we have no cache, nothing can have changed.
# If the cache is empty, nothing can have changed.
return False

if stream_pos >= self._earliest_known_stream_pos:
Expand All @@ -125,42 +118,55 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
self.metrics.inc_misses()
return True

def get_all_entities_changed(
self, stream_pos: int
) -> Optional[Collection[EntityType]]:
"""Returns all entites that have had new things since the given
def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
"""Returns all entities that have had new things since the given
position. If the position is too old it will return None.
Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
return [
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
]
else:
if stream_pos < self._earliest_known_stream_pos:
return None

changed_entities = [] # type: List[EntityType]

for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
return changed_entities

def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
stream_pos = max(stream_pos, old_pos)
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entity
self._entity_to_key[entity] = stream_pos

while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos
)
self._entity_to_key.pop(r, None)
if stream_pos <= self._earliest_known_stream_pos:
return

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
# nothing to do
return
e = self._cache[old_pos]
e.remove(entity)
if not e:
# cache at this point is now empty
del self._cache[old_pos]

e1 = self._cache.get(stream_pos)
if e1 is None:
e1 = self._cache[stream_pos] = set()
e1.add(entity)
self._entity_to_key[entity] = stream_pos

# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
for entity in r:
del self._entity_to_key[entity]

def get_max_pos_of_last_change(self, entity: EntityType) -> int:
"""Returns an upper bound of the stream id of the last change to an
Expand Down
58 changes: 50 additions & 8 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,26 @@ def test_has_entity_changed(self):
cache.entity_has_changed("user@foo.com", 6)
cache.entity_has_changed("bar@baz.net", 7)

# also test multiple things changing on the same stream ID
cache.entity_has_changed("user2@foo.com", 8)
cache.entity_has_changed("bar2@baz.net", 8)

# If it's been changed after that stream position, return True
self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
self.assertTrue(cache.has_entity_changed("bar2@baz.net", 4))
self.assertTrue(cache.has_entity_changed("user2@foo.com", 4))

# If it's been changed at that stream position, return False
self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
self.assertFalse(cache.has_entity_changed("user2@foo.com", 8))

# If there's no changes after that stream position, return False
self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
self.assertFalse(cache.has_entity_changed("user2@foo.com", 9))

# If the entity does not exist, return False.
self.assertFalse(cache.has_entity_changed("not@here.website", 7))
self.assertFalse(cache.has_entity_changed("not@here.website", 9))

# If we request before the stream cache's earliest known position,
# return True, whether it's a known entity or not.
Expand Down Expand Up @@ -89,18 +97,52 @@ def test_get_all_entities_changed(self):

cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("anotheruser@foo.com", 3)
cache.entity_has_changed("user@elsewhere.org", 4)

self.assertEqual(
cache.get_all_entities_changed(1),
["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
)
self.assertEqual(
cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"]
)
r = cache.get_all_entities_changed(1)

# either of these are valid
ok1 = [
"user@foo.com",
"bar@baz.net",
"anotheruser@foo.com",
"user@elsewhere.org",
]
ok2 = [
"user@foo.com",
"anotheruser@foo.com",
"bar@baz.net",
"user@elsewhere.org",
]
self.assertTrue(r == ok1 or r == ok2)

r = cache.get_all_entities_changed(2)
self.assertTrue(r == ok1[1:] or r == ok2[1:])

self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
self.assertEqual(cache.get_all_entities_changed(0), None)

# ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5)
cache.entity_has_changed("bar@baz.net", 5)
cache.entity_has_changed("anotheruser@foo.com", 6)

ok1 = [
"user@elsewhere.org",
"user@foo.com",
"bar@baz.net",
"anotheruser@foo.com",
]
ok2 = [
"user@elsewhere.org",
"bar@baz.net",
"user@foo.com",
"anotheruser@foo.com",
]
r = cache.get_all_entities_changed(3)
self.assertTrue(r == ok1 or r == ok2)

def test_has_any_entity_changed(self):
"""
StreamChangeCache.has_any_entity_changed will return True if any
Expand Down

0 comments on commit fa7bb6b

Please sign in to comment.