Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions bindings/matrix-sdk-ffi/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,22 +1129,29 @@ impl Room {
pub async fn fetch_thread_subscription(
&self,
thread_root_event_id: String,
) -> Result<Option<ThreadSubscription>, ClientError> {
) -> Result<Option<ThreadStatus>, ClientError> {
let thread_root = EventId::parse(thread_root_event_id)?;
Ok(self
.inner
.fetch_thread_subscription(thread_root)
.await?
.map(|sub| ThreadSubscription { automatic: sub.automatic }))
Ok(self.inner.fetch_thread_subscription(thread_root).await?.map(|sub| match sub {
matrix_sdk::room::ThreadStatus::Subscribed { automatic } => {
ThreadStatus::Subscribed { automatic }
}
matrix_sdk::room::ThreadStatus::Unsubscribed => ThreadStatus::Unsubscribed,
}))
}
}

/// Status of a thread subscription (MSC4306).
#[derive(uniffi::Record)]
pub struct ThreadSubscription {
/// Whether the thread subscription happened automatically (e.g. after a
/// mention) or if it was manually requested by the user.
automatic: bool,
#[derive(uniffi::Enum)]
pub enum ThreadStatus {
/// The thread is subscribed to.
Subscribed {
/// Whether the thread subscription happened automatically (e.g. after a
/// mention) or if it was manually requested by the user.
automatic: bool,
},

/// The thread is not subscribed to.
Unsubscribed,
}

/// A listener for receiving new live location shares in a room.
Expand Down
60 changes: 59 additions & 1 deletion crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ use super::{
use crate::{
RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
deserialized_responses::MemberEvent,
store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
store::{
ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt,
ThreadStatus,
},
};

/// `StateStore` integration tests.
Expand Down Expand Up @@ -98,6 +101,8 @@ pub trait StateStoreIntegrationTests {
async fn test_server_info_saving(&self);
/// Test fetching room infos based on [`RoomLoadSettings`].
async fn test_get_room_infos(&self);
/// Test loading thread subscriptions.
async fn test_thread_subscriptions(&self);
}

impl StateStoreIntegrationTests for DynStateStore {
Expand Down Expand Up @@ -1767,6 +1772,53 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_eq!(all_rooms.len(), 0);
}
}

async fn test_thread_subscriptions(&self) {
let first_thread = event_id!("$t1");
let second_thread = event_id!("$t2");

// At first, there is no thread subscription.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
assert!(maybe_status.is_none());

let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
assert!(maybe_status.is_none());

// Setting the thread subscription works.
self.upsert_thread_subscription(
room_id(),
first_thread,
ThreadStatus::Subscribed { automatic: true },
)
.await
.unwrap();

self.upsert_thread_subscription(
room_id(),
second_thread,
ThreadStatus::Subscribed { automatic: false },
)
.await
.unwrap();

// Now, reading the thread subscription returns the expected status.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: true }));
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));

// We can override the thread subscription status.
self.upsert_thread_subscription(room_id(), first_thread, ThreadStatus::Unsubscribed)
.await
.unwrap();

// And it's correctly reflected.
let maybe_status = self.load_thread_subscription(room_id(), first_thread).await.unwrap();
assert_eq!(maybe_status, Some(ThreadStatus::Unsubscribed));
// And the second thread is still subscribed.
let maybe_status = self.load_thread_subscription(room_id(), second_thread).await.unwrap();
assert_eq!(maybe_status, Some(ThreadStatus::Subscribed { automatic: false }));
}
}

/// Macro building to allow your StateStore implementation to run the entire
Expand Down Expand Up @@ -1937,6 +1989,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_get_room_infos().await;
}

#[async_test]
async fn test_thread_subscriptions() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_thread_subscriptions().await;
}
}
};
}
Expand Down
38 changes: 32 additions & 6 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use super::{
use crate::{
MinimalRoomMemberEvent, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
store::QueueWedgeError,
store::{QueueWedgeError, ThreadStatus},
};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -75,7 +75,6 @@ struct MemoryStoreInner {
OwnedRoomId,
HashMap<(String, Option<String>), HashMap<OwnedUserId, (OwnedEventId, Receipt)>>,
>,

room_event_receipts: HashMap<
OwnedRoomId,
HashMap<(String, Option<String>), HashMap<OwnedEventId, HashMap<OwnedUserId, Receipt>>>,
Expand All @@ -84,6 +83,7 @@ struct MemoryStoreInner {
send_queue_events: BTreeMap<OwnedRoomId, Vec<QueuedRequest>>,
dependent_send_queue_events: BTreeMap<OwnedRoomId, Vec<DependentQueuedRequest>>,
seen_knock_requests: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, OwnedUserId>>,
thread_subscriptions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadStatus>>,
}

/// In-memory, non-persistent implementation of the `StateStore`.
Expand Down Expand Up @@ -754,6 +754,7 @@ impl StateStore for MemoryStore {
inner.room_event_receipts.remove(room_id);
inner.send_queue_events.remove(room_id);
inner.dependent_send_queue_events.remove(room_id);
inner.thread_subscriptions.remove(room_id);

Ok(())
}
Expand Down Expand Up @@ -938,10 +939,6 @@ impl StateStore for MemoryStore {
}
}

/// List all the dependent send queue events.
///
/// This returns absolutely all the dependent send queue events, whether
/// they have an event id or not.
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
Expand All @@ -955,6 +952,35 @@ impl StateStore for MemoryStore {
.cloned()
.unwrap_or_default())
}

async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
status: ThreadStatus,
) -> Result<(), Self::Error> {
self.inner
.write()
.unwrap()
.thread_subscriptions
.entry(room.to_owned())
.or_default()
.insert(thread_id.to_owned(), status);
Ok(())
}

async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<ThreadStatus>, Self::Error> {
let inner = self.inner.read().unwrap();
Ok(inner
.thread_subscriptions
.get(room)
.and_then(|subscriptions| subscriptions.get(thread_id))
.copied())
}
}

#[cfg(test)]
Expand Down
54 changes: 54 additions & 0 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,25 @@ pub enum StoreError {
/// An error happened in the underlying database backend.
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),

/// An error happened while serializing or deserializing some data.
#[error(transparent)]
Json(#[from] serde_json::Error),

/// An error happened while deserializing a Matrix identifier, e.g. an user
/// id.
#[error(transparent)]
Identifier(#[from] ruma::IdParseError),

/// The store is locked with a passphrase and an incorrect passphrase was
/// given.
#[error("The store failed to be unlocked")]
StoreLocked,

/// An unencrypted store was tried to be unlocked with a passphrase.
#[error("The store is not encrypted but was tried to be opened with a passphrase")]
UnencryptedStore,

/// The store failed to encrypt or decrypt some data.
#[error("Error encrypting or decrypting data from the store: {0}")]
Encryption(#[from] StoreEncryptionError),
Expand All @@ -130,11 +135,19 @@ pub enum StoreError {
version: {0}, latest version: {1}"
)]
UnsupportedDatabaseVersion(usize, usize),

/// Redacting an event in the store has failed.
///
/// This should never happen.
#[error("Redaction failed: {0}")]
Redaction(#[source] ruma::canonical_json::RedactionError),

/// The store contains invalid data.
#[error("The store contains invalid data: {details}")]
InvalidData {
/// Details about which data is invalid, and how.
details: String,
},
}

impl StoreError {
Expand Down Expand Up @@ -439,6 +452,47 @@ pub enum RoomLoadSettings {
One(OwnedRoomId),
}

/// Status of a thread subscription, as saved in the state store.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ThreadStatus {
/// The thread is subscribed to.
Subscribed {
/// Whether the subscription was made automatically by a client, not by
/// manual user choice.
automatic: bool,
},
/// The thread is unsubscribed to (it won't cause any notifications or
/// automatic subscription anymore).
Unsubscribed,
}

impl ThreadStatus {
/// Convert the current [`ThreadStatus`] into a string representation.
pub fn as_str(&self) -> &'static str {
match self {
ThreadStatus::Subscribed { automatic } => {
if *automatic {
"automatic"
} else {
"manual"
}
}
ThreadStatus::Unsubscribed => "unsubscribed",
}
}

/// Convert a string representation into a [`ThreadStatus`], if it is a
/// valid one, or `None` otherwise.
pub fn from_value(s: &str) -> Option<Self> {
match s {
"automatic" => Some(ThreadStatus::Subscribed { automatic: true }),
"manual" => Some(ThreadStatus::Subscribed { automatic: false }),
"unsubscribed" => Some(ThreadStatus::Unsubscribed),
_ => None,
}
}
}

/// Store state changes and pass them to the StateStore.
#[derive(Clone, Debug, Default)]
pub struct StateChanges {
Expand Down
39 changes: 39 additions & 0 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::{
deserialized_responses::{
DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
},
store::ThreadStatus,
};

/// An abstract state store trait that can be used to implement different stores
Expand Down Expand Up @@ -478,6 +479,27 @@ pub trait StateStore: AsyncTraitDeps {
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error>;

/// Insert or update a thread subscription for a given room and thread.
///
/// Note: there's no way to remove a thread subscription, because it's
/// either subscribed to, or unsubscribed to, after it's been saved for
/// the first time.
async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
status: ThreadStatus,
) -> Result<(), Self::Error>;

/// Loads the current thread subscription for a given room and thread.
///
/// Returns `None` if there was no entry for the given room/thread pair.
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<ThreadStatus>, Self::Error>;
}

#[repr(transparent)]
Expand Down Expand Up @@ -772,6 +794,23 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
.await
.map_err(Into::into)
}

async fn upsert_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
status: ThreadStatus,
) -> Result<(), Self::Error> {
self.0.upsert_thread_subscription(room, thread_id, status).await.map_err(Into::into)
}

async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<ThreadStatus>, Self::Error> {
self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
}
}

/// Convenience functionality for state stores.
Expand Down
15 changes: 14 additions & 1 deletion crates/matrix-sdk-indexeddb/src/state_store/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{
};
use crate::IndexeddbStateStoreError;

const CURRENT_DB_VERSION: u32 = 12;
const CURRENT_DB_VERSION: u32 = 13;
const CURRENT_META_DB_VERSION: u32 = 2;

/// Sometimes Migrations can't proceed without having to drop existing
Expand Down Expand Up @@ -237,6 +237,9 @@ pub async fn upgrade_inner_db(
if old_version < 12 {
db = migrate_to_v12(db).await?;
}
if old_version < 13 {
db = migrate_to_v13(db).await?;
}
}

db.close();
Expand Down Expand Up @@ -793,6 +796,16 @@ async fn migrate_to_v12(db: IdbDatabase) -> Result<IdbDatabase> {
Ok(IdbDatabase::open_u32(&name, 12)?.await?)
}

/// Add the thread subscriptions table.
async fn migrate_to_v13(db: IdbDatabase) -> Result<IdbDatabase> {
let migration = OngoingMigration {
drop_stores: [].into(),
create_stores: [keys::THREAD_SUBSCRIPTIONS].into_iter().collect(),
data: Default::default(),
};
apply_migration(db, 13, migration).await
}

#[cfg(all(test, target_family = "wasm"))]
mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
Expand Down
Loading
Loading