Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event cache): leaner updates when clearing/shrinking to the last chunk #4709

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/matrix-sdk-common/src/linked_chunk/lazy_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,11 @@ where
linked_chunk.links.replace_with(chunk_ptr);

if let Some(updates) = linked_chunk.updates.as_mut() {
// TODO: clear updates first? (see same comment in `clear`).
// Clear the previous updates, as we're about to insert a clear they would be
// useless.
updates.clear();
updates.push(Update::Clear);

emit_new_first_chunk_updates(linked_chunk.links.first_chunk(), updates);
}

Expand Down
4 changes: 3 additions & 1 deletion crates/matrix-sdk-common/src/linked_chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,9 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {

// “Clear” `self.updates`.
if let Some(updates) = self.updates.as_mut() {
// TODO: Optimisation: Do we want to clear all pending `Update`s in `updates`?
// Clear the previous updates, as we're about to insert a clear they would be
// useless.
updates.clear();
updates.push(Update::Clear);
updates.push(Update::NewItemsChunk {
previous: None,
Expand Down
18 changes: 18 additions & 0 deletions crates/matrix-sdk-common/src/linked_chunk/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl<Item, Gap> ObservableUpdates<Item, Gap> {
self.inner.write().unwrap().push(update);
}

/// Clear all pending updates.
pub(super) fn clear(&mut self) {
self.inner.write().unwrap().clear();
}

/// Take new updates.
///
/// Updates that have been taken will not be read again.
Expand Down Expand Up @@ -242,6 +247,19 @@ impl<Item, Gap> UpdatesInner<Item, Gap> {
}
}

/// Clear all pending updates.
fn clear(&mut self) {
self.updates.clear();

// Reset all the per-reader indices.
for idx in self.last_index_per_reader.values_mut() {
*idx = 0;
}

// No need to wake the wakers; they're waiting for a new update, and we
// just made them all disappear.
}

/// Take new updates; it considers the caller is the main reader, i.e. it
/// will use the [`Self::MAIN_READER_TOKEN`].
///
Expand Down
3 changes: 2 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ impl EventCacheInner {
let rooms = self.by_room.write().await;
for room in rooms.values() {
// Clear all the room state.
let updates_as_vector_diffs = room.inner.state.write().await.reset().await?;
let mut updates_as_vector_diffs = Vec::new();
room.inner.state.write().await.reset(&mut updates_as_vector_diffs).await?;

// Notify all the observers that we've lost track of state. (We ignore the
// error if there aren't any.)
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ mod tests {
assert_matches!(found, PaginationToken::None);

// Reset waited_for_initial_prev_token and event state.
let _ = pagination.inner.state.write().await.reset().await.unwrap();
pagination.inner.state.write().await.reset(&mut Vec::new()).await.unwrap();

// If I wait for a back-pagination token for 0 seconds,
let before = Instant::now();
Expand All @@ -455,7 +455,7 @@ mod tests {
assert!(waited.as_secs() < 1);

// Reset waited_for_initial_prev_token state.
let _ = pagination.inner.state.write().await.reset().await.unwrap();
pagination.inner.state.write().await.reset(&mut Vec::new()).await.unwrap();

// If I wait for a back-pagination token for 1 second,
let before = Instant::now();
Expand Down
50 changes: 32 additions & 18 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ impl RoomEventCache {
/// storage.
pub async fn clear(&self) -> Result<()> {
// Clear the linked chunk and persisted storage.
let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
let mut updates_as_vector_diffs = Vec::new();
self.inner.state.write().await.reset(&mut updates_as_vector_diffs).await?;

// Clear the (temporary) events mappings.
self.inner.all_events.write().await.clear();
Expand Down Expand Up @@ -377,7 +378,8 @@ impl RoomEventCacheInner {
let mut state = self.state.write().await;

// Reset the room's state.
let updates_as_vector_diffs = state.reset().await?;
let mut updates_as_vector_diffs = Vec::new();
state.reset(&mut updates_as_vector_diffs).await?;

// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
Expand Down Expand Up @@ -465,15 +467,11 @@ impl RoomEventCacheInner {
// If there was a previous batch token, and there's at least one non-duplicated
// new event, unload the chunks so it only contains the last
// one; otherwise, there might be a valid gap in between, and
// observers may not render it (yet). In this case, extend the
// updates with those from the unload; the new updates include a `clear` (as of
// 2025-02-24), so they will remove all the previous ones first.
// observers may not render it (yet).
//
// We must do this *after* the above call to `.with_events_mut`, so the new
// events and gaps are properly persisted to storage.
if let Some(new_events_diffs) = state.shrink_to_last_chunk().await? {
sync_timeline_events_diffs.extend(new_events_diffs);
}
state.shrink_to_last_chunk(&mut sync_timeline_events_diffs).await?;
}

{
Expand Down Expand Up @@ -775,12 +773,13 @@ mod private {
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
pub(super) async fn shrink_to_last_chunk(
&mut self,
) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
sync_timeline_events_diffs: &mut Vec<VectorDiff<TimelineEvent>>,
) -> Result<(), EventCacheError> {
let Some(store) = self.store.get() else {
// No need to do anything if there's no storage; we'll already reset the
// timeline after a limited response.
// TODO: that might be a way to unify our code, though?
return Ok(None);
return Ok(());
};

let store_lock = store.lock().await?;
Expand Down Expand Up @@ -810,15 +809,21 @@ mod private {
// updates the chunk identifier generator.
if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
error!("error when replacing the linked chunk: {err}");
return self.reset().await.map(Some);
return self.reset(sync_timeline_events_diffs).await;
}

// Don't propagate those updates to the store; this is only for the in-memory
// representation that we're doing this. Let's drain those store updates.
let _ = self.events.store_updates().take();

// However, we want to get updates as `VectorDiff`s, for the external listeners.
Ok(Some(self.events.updates_as_vector_diffs()))
//
// We can override the parent updates here, as we've done a clear above (in
// `replace_with`).
*sync_timeline_events_diffs = self.events.updates_as_vector_diffs();
assert!(matches!(sync_timeline_events_diffs[0], VectorDiff::Clear));

Ok(())
}

/// Removes the bundled relations from an event, if they were present.
Expand Down Expand Up @@ -925,12 +930,21 @@ mod private {

/// Resets this data structure as if it were brand new.
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
pub async fn reset(
&mut self,
sync_timeline_events_diffs: &mut Vec<VectorDiff<TimelineEvent>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you pass a &mut instead of keeping to return the diffs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to follow the pattern introduced in replace_with, which makes use of this function. I can isolate using the pattern to replace_with / shrink_to_last_chunk, if you'd prefer, and have the override happen there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to see how it will look like but if the idea is to remove this &mut, I think I would prefer that yes!

) -> Result<(), EventCacheError> {
// We're clearing the events here…
self.events.reset();

self.propagate_changes().await?;
self.waited_for_initial_prev_token = false;

Ok(self.events.updates_as_vector_diffs())
// …So we can override the updates here, as we've done a clear above.
*sync_timeline_events_diffs = self.events.updates_as_vector_diffs();
assert!(matches!(sync_timeline_events_diffs[0], VectorDiff::Clear));

Ok(())
}

/// Returns a read-only reference to the underlying events.
Expand Down Expand Up @@ -1954,15 +1968,15 @@ mod tests {
assert!(stream.is_empty());

// Shrink the linked chunk to the last chunk.
let diffs = room_event_cache
let mut diffs = Vec::new();
room_event_cache
.inner
.state
.write()
.await
.shrink_to_last_chunk()
.shrink_to_last_chunk(&mut diffs)
.await
.expect("shrinking should succeed")
.expect("there must be updates");
.expect("shrinking should succeed");

// We receive updates about the changes to the linked chunk.
assert_eq!(diffs.len(), 2);
Expand Down
9 changes: 3 additions & 6 deletions crates/matrix-sdk/tests/integration/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1298,16 +1298,13 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() {
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
);
assert_eq!(diffs.len(), 4);
assert_eq!(diffs.len(), 2);

// The first two diffs are for the gap and the new events, but they don't really
// matter in this test, because then, the linked chunk is unloaded, causing
// a clear:
assert_matches!(&diffs[2], VectorDiff::Clear);
assert_matches!(&diffs[0], VectorDiff::Clear);

// Then the latest event chunk is reloaded.
// `$ev1`, `$ev2` and `$ev3` are added.
assert_matches!(&diffs[3], VectorDiff::Append { values: events } => {
assert_matches!(&diffs[1], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 3);
assert_eq!(events[0].event_id().unwrap().as_str(), "$1");
assert_eq!(events[1].event_id().unwrap().as_str(), "$2");
Expand Down
Loading