Skip to content

Commit

Permalink
!foo
Browse files Browse the repository at this point in the history
  • Loading branch information
Hywan committed Jan 26, 2024
1 parent 790d405 commit b47a4a1
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 44 deletions.
1 change: 1 addition & 0 deletions crates/matrix-sdk-crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ testing = ["dep:http"]
[dependencies]
aes = "0.8.1"
as_variant = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bs58 = { version = "0.5.0" }
byteorder = { workspace = true }
Expand Down
17 changes: 17 additions & 0 deletions crates/matrix-sdk-crypto/src/store/memorystore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
time::{Duration, Instant},
};

use async_stream::stream;
use async_trait::async_trait;
use ruma::{
events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId,
Expand All @@ -28,6 +29,7 @@ use tokio::sync::{Mutex, RwLock};

use super::{
caches::{DeviceStore, GroupSessionStore, SessionStore},
traits::StreamOf,
Account, BackupKeys, Changes, CryptoStore, InboundGroupSession, PendingChanges, RoomKeyCounts,
RoomSettings, Session,
};
Expand Down Expand Up @@ -251,6 +253,21 @@ impl CryptoStore for MemoryStore {
Ok(self.inbound_group_sessions.get_all())
}

async fn get_inbound_group_sessions_stream(
&self,
) -> Result<StreamOf<super::error::Result<InboundGroupSession>>> {
// There is no stream API for this `MemoryStore`. Let's simply consume the `Vec`
// from `get_inbound_group_sessions` as a stream. It's not ideal, but it's
// OK-ish for now.
let inbound_group_sessions = self.inbound_group_sessions.get_all();

Ok(StreamOf::new(Box::pin(stream! {
for item in inbound_group_sessions {
yield Ok(item);
}
})))
}

async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
let backed_up =
self.get_inbound_group_sessions().await?.into_iter().filter(|s| s.backed_up()).count();
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
pub use error::{CryptoStoreError, Result};
use matrix_sdk_common::{store_locks::CrossProcessStoreLock, timeout::timeout};
pub use memorystore::MemoryStore;
pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore, StreamOf};

pub use crate::gossiping::{GossipRequest, SecretInfo};

Expand Down
47 changes: 46 additions & 1 deletion crates/matrix-sdk-crypto/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt, sync::Arc};
use std::{
collections::HashMap,
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use async_trait::async_trait;
use futures_core::Stream;
use matrix_sdk_common::AsyncTraitDeps;
use ruma::{
events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId,
Expand Down Expand Up @@ -103,6 +110,11 @@ pub trait CryptoStore: AsyncTraitDeps {
/// Get all the inbound group sessions we have stored.
async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>, Self::Error>;

/// Get all the inbound group session we have stored, as a `Stream`.
async fn get_inbound_group_sessions_stream(
&self,
) -> Result<StreamOf<Result<InboundGroupSession>>, Self::Error>;

/// Get the number inbound group sessions we have and how many of them are
/// backed up.
async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts, Self::Error>;
Expand Down Expand Up @@ -328,6 +340,12 @@ impl<T: CryptoStore> CryptoStore for EraseCryptoStoreError<T> {
self.0.get_inbound_group_sessions().await.map_err(Into::into)
}

async fn get_inbound_group_sessions_stream(
&self,
) -> Result<StreamOf<Result<InboundGroupSession>>> {
self.0.get_inbound_group_sessions_stream().await.map_err(Into::into)
}

async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
self.0.inbound_group_session_counts().await.map_err(Into::into)
}
Expand Down Expand Up @@ -508,3 +526,30 @@ impl IntoCryptoStore for Arc<DynCryptoStore> {
self
}
}

/// A concrete type wrapping a `Pin<Box<dyn Stream<Item = T>>>`.
///
/// It is used only to make the [`CryptoStore`] trait object-safe. Please don't
/// use it for other things.
pub struct StreamOf<T>(Pin<Box<dyn Stream<Item = T>>>);

impl<T> StreamOf<T> {
/// Create a new `Self`.
pub fn new(stream: Pin<Box<dyn Stream<Item = T>>>) -> Self {
Self(stream)
}
}

impl<T> fmt::Debug for StreamOf<T> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_tuple("StreamOf").finish()
}
}

impl<T> Stream for StreamOf<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(context)
}
}
88 changes: 61 additions & 27 deletions crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::{
collections::HashMap,
num::NonZeroUsize,
sync::{Arc, RwLock},
};

use async_trait::async_trait;
use futures_util::StreamExt;
use gloo_utils::format::JsValueSerdeExt;
use indexed_db_futures::prelude::*;
use matrix_sdk_crypto::{
Expand All @@ -26,8 +28,8 @@ use matrix_sdk_crypto::{
Session, StaticAccountData,
},
store::{
caches::SessionStore, BackupKeys, Changes, CryptoStore, CryptoStoreError, PendingChanges,
RoomKeyCounts, RoomSettings,
self, caches::SessionStore, BackupKeys, Changes, CryptoStore, CryptoStoreError,
PendingChanges, RoomKeyCounts, RoomSettings, StreamOf,
},
types::events::room_key_withheld::RoomKeyWithheldEvent,
Account, GossipRequest, GossippedSecret, ReadOnlyDevice, ReadOnlyUserIdentities, SecretInfo,
Expand All @@ -43,8 +45,9 @@ use tracing::{debug, warn};
use wasm_bindgen::JsValue;
use web_sys::{DomException, IdbKeyRange};

use crate::crypto_store::{
indexeddb_serializer::IndexeddbSerializer, migrations::open_and_upgrade_db,
use crate::{
crypto_store::{indexeddb_serializer::IndexeddbSerializer, migrations::open_and_upgrade_db},
stream::StreamByRenewedCursor,
};

mod indexeddb_serializer;
Expand Down Expand Up @@ -94,16 +97,16 @@ mod keys {
pub const RECOVERY_KEY_V1: &str = "recovery_key_v1";
}

/// An implementation of [CryptoStore] that uses [IndexedDB] for persistent
/// An implementation of [`CryptoStore`] that uses [`IndexedDB`] for persistent
/// storage.
///
/// [IndexedDB]: https://developer.mozilla.org/en-US/docs/Web/API/IndexedDB_API
/// [`IndexedDB`]: https://developer.mozilla.org/en-US/docs/Web/API/IndexedDB_API
pub struct IndexeddbCryptoStore {
static_account: RwLock<Option<StaticAccountData>>,
name: String,
pub(crate) inner: IdbDatabase,
pub(crate) inner: Arc<IdbDatabase>,

serializer: IndexeddbSerializer,
serializer: Arc<IndexeddbSerializer>,
session_cache: SessionStore,
save_changes_lock: Arc<Mutex<()>>,
}
Expand Down Expand Up @@ -175,8 +178,8 @@ impl IndexeddbCryptoStore {
Ok(Self {
name,
session_cache,
inner: db,
serializer,
inner: Arc::new(db),
serializer: Arc::new(serializer),
static_account: RwLock::new(None),
save_changes_lock: Default::default(),
})
Expand Down Expand Up @@ -280,23 +283,7 @@ impl IndexeddbCryptoStore {
&self,
stored_value: JsValue,
) -> Result<InboundGroupSession> {
let idb_object: InboundGroupSessionIndexedDbObject =
serde_wasm_bindgen::from_value(stored_value)?;
let pickled_session =
self.serializer.deserialize_value_from_bytes(&idb_object.pickled_session)?;
let session = InboundGroupSession::from_pickle(pickled_session)
.map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?;

// Although a "backed up" flag is stored inside `idb_object.pickled_session`, it
// is not maintained when backups are reset. Overwrite the flag with the
// needs_backup value from the IDB object.
if idb_object.needs_backup {
session.reset_backup_state();
} else {
session.mark_as_backed_up();
}

Ok(session)
deserialize_inbound_group_session(stored_value, &self.serializer)
}

/// Transform a [`GossipRequest`] into a `JsValue` holding a
Expand Down Expand Up @@ -326,6 +313,28 @@ impl IndexeddbCryptoStore {
}
}

fn deserialize_inbound_group_session(
stored_value: JsValue,
serializer: &IndexeddbSerializer,
) -> Result<InboundGroupSession> {
let idb_object: InboundGroupSessionIndexedDbObject =
serde_wasm_bindgen::from_value(stored_value)?;
let pickled_session = serializer.deserialize_value_from_bytes(&idb_object.pickled_session)?;
let session = InboundGroupSession::from_pickle(pickled_session)
.map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?;

// Although a "backed up" flag is stored inside `idb_object.pickled_session`, it
// is not maintained when backups are reset. Overwrite the flag with the
// needs_backup value from the IDB object.
if idb_object.needs_backup {
session.reset_backup_state();
} else {
session.mark_as_backed_up();
}

Ok(session)
}

// Small hack to have the following macro invocation act as the appropriate
// trait impl block on wasm, but still be compiled on non-wasm as a regular
// impl block otherwise.
Expand Down Expand Up @@ -800,6 +809,31 @@ impl_crypto_store! {
).await
}

async fn get_inbound_group_sessions_stream(&self) -> Result<StreamOf<store::Result<InboundGroupSession>>> {
let db = self.inner.clone();
let serializer = self.serializer.clone();

let stream = StreamByRenewedCursor::new(
db,
|db| db.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
),
keys::INBOUND_GROUP_SESSIONS_V2.to_owned(),
// SAFETY: `unwrap` is safe because 100 isn't zero.
NonZeroUsize::new(100).unwrap(),
)
.await?
.map(move |item: Result<(JsValue, JsValue), DomException>| -> store::Result<InboundGroupSession> {
let item: (JsValue, JsValue) = item.unwrap() /* FIX ME */;
let (_key, value) = item;

Ok(deserialize_inbound_group_session(value, serializer.as_ref())?)
});

Ok(StreamOf::new(Box::pin(stream)))
}

async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
let tx = self
.inner
Expand Down
Loading

0 comments on commit b47a4a1

Please sign in to comment.