Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Consistently compare to the earliest known stream position in the stream change cache #14435

Merged
merged 16 commits into from
Dec 5, 2022
Merged
1 change: 1 addition & 0 deletions changelog.d/14435.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix.
40 changes: 28 additions & 12 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ def __init__(
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
self._entity_to_key: Dict[EntityType, int] = {}

# map from stream id to the a set of entities which changed at that stream id.
# map from stream id to the set of entities which changed at that stream id.
self._cache: SortedDict[int, Set[EntityType]] = SortedDict()
# map from entity to the stream ID of the latest change for that entity.
#
# Must be kept in sync with _cache.
self._entity_to_key: Dict[EntityType, int] = {}

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
Expand Down Expand Up @@ -85,35 +88,44 @@ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos"""
clokep marked this conversation as resolved.
Show resolved Hide resolved
assert isinstance(stream_pos, int)

if stream_pos < self._earliest_known_stream_pos:
# _cache is not valid at or before the earliest known stream position, so
# return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_entity_changed is called from the following places:

  • AccountDataWorkerStore.get_updated_account_data_for_user, which seems to be used to build account data in /sync responses.
  • DeviceInboxWorkerStore:
    • _get_device_messages: feeds into to-device updates in /sync and app services.
    • delete_messages_for_device: used to pause /sync in some situation? (Unclear...?)
    • get_new_device_msgs_for_remote: used when calculating EDUs to send in federation transactions.
  • DeviceWorkerStore:
    • get_device_updates_by_remote: used when calculating EDUs to send in federation transactions.
    • get_users_whose_signatures_changed: used in the device list entry for /sync.
  • EventPushActionsWorkerStore._get_notif_unread_count_for_user_room, which is used to generate notification counts in /sync and to update event_push_summary.
  • PushRulesWorkerStore.have_push_rules_changed_for_user, which is used to build push rules in /sync responses.
  • ReceiptsWorkerStore.get_linearized_receipts_for_room, which is used build receipts data in /initial_sync.
  • The StreamWorkerStore uses it in a few spots:
    • get_rooms_that_changed: feeds into the /keys/changes response.
    • get_room_events_stream_for_room: used to get the events that have changed in a room for /sync response.
    • get_membership_changes_for_user: feeds into the /keys/changes response. generating membership changes in /sync response, and whatever uses RoomEventSource.get_new_events.
  • TagsWorkerStore.get_updated_tags: used to generate data in a /sync response.

self.metrics.inc_misses()
return True

# If the entity is unknown, it hasn't changed.
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
self.metrics.inc_hits()
return False

# This is a known entity, return true if the stream position is earlier
# than the last change.
if stream_pos < latest_entity_change_pos:
self.metrics.inc_misses()
return True

# Otherwise, the stream position is after the latest change: return false.
self.metrics.inc_hits()
return False

def get_entities_changed(
self, entities: Collection[EntityType], stream_pos: int
) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
Returns the subset of given entities that have had changes since the given
position.

Entities unknown to the cache will be returned.

If the position is too old it will just return the given list.
"""
changed_entities = self.get_all_entities_changed(stream_pos)
if changed_entities is not None:
# We now do an intersection, trying to do so in the most efficient
# way possible (some of these sets are *large*). First check in the
# given iterable is already set that we can reuse, otherwise we
# given iterable is already a set that we can reuse, otherwise we
# create a set of the *smallest* of the two iterables and call
# `intersection(..)` on it (this can be twice as fast as the reverse).
if isinstance(entities, (set, frozenset)):
Expand All @@ -130,8 +142,8 @@ def get_entities_changed(
return result

def has_any_entity_changed(self, stream_pos: int) -> bool:
"""Returns if any entity has changed"""
assert type(stream_pos) is int
"""Returns true if any entity has changed"""
assert isinstance(stream_pos, int)

if not self._cache:
# If the cache is empty, nothing can have changed.
Expand All @@ -145,13 +157,15 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
return True

def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
"""Returns all entities that have had new things since the given
"""Returns all entities that have had changes since the given
position. If the position is too old it will return None.

Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int
assert isinstance(stream_pos, int)

# _cache is not valid before the earliest known stream position, so
# return that no known entities have changed.
if stream_pos < self._earliest_known_stream_pos:
clokep marked this conversation as resolved.
Show resolved Hide resolved
return None

Expand All @@ -165,8 +179,10 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int
assert isinstance(stream_pos, int)

# For a change before _cache is valid (e.g. at or before the earliest known
# stream position) there's nothing to do.
if stream_pos <= self._earliest_known_stream_pos:
return

Expand Down
2 changes: 2 additions & 0 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def test_has_entity_changed(self):
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
self.assertTrue(cache.has_entity_changed("not@here.website", 3))

def test_entity_has_changed_pops_off_start(self):
"""
Expand Down