diff --git a/changelog.d/14648.misc b/changelog.d/14648.misc new file mode 100644 index 000000000000..321c6d0daf09 --- /dev/null +++ b/changelog.d/14648.misc @@ -0,0 +1 @@ +Fix events stream change cache and stream ID update order. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index bbee02ab18f0..9c2b38df401e 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1187,7 +1187,7 @@ async def get_forward_extremities_for_room_at_stream_ordering( """ # We want to make the cache more effective, so we clamp to the last # change before the given ordering. - last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined] + last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # We don't always have a full stream_to_exterm_id table, e.g. after # the upgrade that introduced it, so we make sure we never ask for a diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 01e935edef53..b0ee2073bab7 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -208,10 +208,7 @@ def __init__( # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). + # SQLite/StreamIdGenerator only support a single writer instance (is_writer) self._stream_id_gen = StreamIdGenerator( db_conn, "events", @@ -242,6 +239,25 @@ def __init__( prefilled_cache=curr_state_delta_prefill, ) + event_cache_prefill, min_event_val = self.db_pool.get_cache_dict( + db_conn, + "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", + min_event_val, + prefilled_cache=event_cache_prefill, + stream_id_gen=self._stream_id_gen, + ) + self._membership_stream_cache = StreamChangeCache( + "MembershipStreamChangeCache", + events_max, + stream_id_gen=self._stream_id_gen, + ) + if hs.config.worker.run_background_tasks: # We periodically clean out old transaction ID mappings self._clock.looping_call( @@ -298,8 +314,20 @@ def process_replication_rows( token: int, rows: Iterable[Any], ) -> None: + # Process event stream replication rows, handling both the ID generators from the events + # worker store and the stream change caches in this store as the two are interlinked. if stream_name == EventsStream.NAME: + for row in rows: + data = row.data + self._events_stream_cache.entity_has_changed(data.room_id, token) + if data.type == EventTypes.Member: + self._membership_stream_cache.entity_has_changed( + data.state_key, token + ) + # NOTE: this must be updated *after* the stream change cache, so other threads don't + # see a token ahead of the cache. self._stream_id_gen.advance(instance_name, token) + elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index cc27ec38042b..a6bd5888b04c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -71,7 +71,6 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable if TYPE_CHECKING: @@ -397,23 +396,6 @@ def __init__( # during startup which would cause one to die. self._need_to_reset_federation_stream_positions = self._send_federation - events_max = self.get_room_max_stream_ordering() - event_cache_prefill, min_event_val = self.db_pool.get_cache_dict( - db_conn, - "events", - entity_column="room_id", - stream_column="stream_ordering", - max_value=events_max, - ) - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", - min_event_val, - prefilled_cache=event_cache_prefill, - ) - self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max - ) - self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 165745954990..a3a16d452e59 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -19,6 +19,7 @@ import attr from sortedcontainers import SortedDict +from synapse.storage.util.id_generators import AbstractStreamIdTracker from synapse.util import caches logger = logging.getLogger(__name__) @@ -70,6 +71,7 @@ def __init__( current_stream_pos: int, max_size: int = 10000, prefilled_cache: Optional[Mapping[EntityType, int]] = None, + stream_id_gen: Optional[AbstractStreamIdTracker] = None, ) -> None: self._original_max_size: int = max_size self._max_size = math.floor(max_size) @@ -92,9 +94,11 @@ def __init__( "cache", self.name, self._cache, resize_callback=self.set_cache_factor ) + self.stream_id_gen = stream_id_gen + if prefilled_cache: for entity, stream_pos in prefilled_cache.items(): - self.entity_has_changed(entity, stream_pos) + self.entity_has_changed(entity, stream_pos, check_pos=False) def set_cache_factor(self, factor: float) -> bool: """ @@ -256,7 +260,9 @@ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult: changed_entities.extend(self._cache[k]) return AllEntitiesChangedResult(changed_entities) - def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: + def entity_has_changed( + self, entity: EntityType, stream_pos: int, check_pos: bool = True + ) -> None: """ Informs the cache that the entity has been changed at the given position. @@ -271,6 +277,13 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: if stream_pos <= self._earliest_known_stream_pos: return + # Any change being flagged must be ahead of any current token, otherwise + # we have a race condition between token position and stream change cache. + # NOTE: this checks for equal to allow for a process persisting an event to + # immediately flag the cache, as it cannot know the ID before generating it. + if check_pos and self.stream_id_gen: + assert stream_pos >= self.stream_id_gen.get_current_token() + old_pos = self._entity_to_key.get(entity, None) if old_pos is not None: if old_pos >= stream_pos: