Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(groups): add a worker to delete expired messages #1503

Merged
merged 31 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
843faee
wip
mchenani Jan 14, 2025
8bd7422
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
9e84a95
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
5a994fd
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
b9be121
add group message expiration setting to db
mchenani Jan 16, 2025
255c6f8
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
22e3eff
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
ecd3b92
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
8e7835b
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 20, 2025
1dcb898
rename to ConversationMessageDisappearingSettings
mchenani Jan 20, 2025
d62cac0
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
354b7e6
wip
mchenani Jan 21, 2025
1b11016
extract disappearing messages worker to a separate file
mchenani Jan 21, 2025
508c792
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
8951e87
store metadatas into db
mchenani Jan 23, 2025
de91ebf
remove comments
mchenani Jan 23, 2025
62e2aae
wip
mchenani Jan 23, 2025
c3a3f91
add test
mchenani Jan 24, 2025
6f54733
fix tests
mchenani Jan 24, 2025
0825f6b
fix tests
mchenani Jan 24, 2025
3911e8f
address comments
mchenani Jan 24, 2025
736fc06
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 24, 2025
6bf97ed
fix clippy
mchenani Jan 27, 2025
bf2b894
fix clippy
mchenani Jan 27, 2025
e6136cb
fix clippy
mchenani Jan 27, 2025
c783def
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 27, 2025
1a527a9
fix napi and address comments
mchenani Jan 27, 2025
fbdd11f
add comments to messageDisappearingSettings
mchenani Jan 27, 2025
aeccd04
fix missing mappings
mchenani Jan 27, 2025
9ec0034
fix bindings_node
mchenani Jan 27, 2025
94f5b34
fix bindings_node
mchenani Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 178 additions & 8 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use xmtp_id::{
InboxId,
};
use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate;
use xmtp_mls::groups::group_mutable_metadata::GroupMessageExpirationSettings;
mchenani marked this conversation as resolved.
Show resolved Hide resolved
use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::groups::HmacKey;
use xmtp_mls::storage::group::ConversationType;
Expand Down Expand Up @@ -1226,6 +1227,21 @@ impl FfiConversationListItem {
}
}

#[derive(uniffi::Record, Debug)]
pub struct FfiGroupMessageExpirationSettings {
pub expire_from_ms: i64,
pub expire_in_ms: i64,
}

impl FfiGroupMessageExpirationSettings {
fn new(expire_from_ms: i64, expire_in_ms: i64) -> Self {
Self {
expire_from_ms,
expire_in_ms,
}
}
}

impl From<MlsGroup<RustXmtpClient>> for FfiConversation {
fn from(mls_group: MlsGroup<RustXmtpClient>) -> FfiConversation {
FfiConversation { inner: mls_group }
Expand Down Expand Up @@ -1336,6 +1352,12 @@ impl From<FfiDirection> for SortDirection {
}
}

impl From<FfiGroupMessageExpirationSettings> for GroupMessageExpirationSettings {
fn from(settings: FfiGroupMessageExpirationSettings) -> Self {
GroupMessageExpirationSettings::new(settings.expire_from_ms, settings.expire_in_ms)
}
}

#[derive(uniffi::Record, Clone, Default)]
pub struct FfiListMessagesOptions {
pub sent_before_ns: Option<i64>,
Expand Down Expand Up @@ -1391,13 +1413,23 @@ pub struct FfiCreateGroupOptions {

impl FfiCreateGroupOptions {
pub fn into_group_metadata_options(self) -> GroupMetadataOptions {
let message_retention_settings: Option<GroupMessageExpirationSettings> =
if let (Some(message_expiration_from_ms), Some(message_expiration_ms)) =
(self.message_expiration_from_ms, self.message_expiration_ms)
{
Some(GroupMessageExpirationSettings::new(
message_expiration_from_ms,
message_expiration_ms,
))
} else {
None
};
GroupMetadataOptions {
name: self.group_name,
image_url_square: self.group_image_url_square,
description: self.group_description,
pinned_frame_url: self.group_pinned_frame_url,
message_expiration_from_ms: self.message_expiration_from_ms,
message_expiration_ms: self.message_expiration_ms,
message_retention_settings,
}
}
}
Expand Down Expand Up @@ -1628,6 +1660,32 @@ impl FfiConversation {
.map_err(Into::into)
}

pub async fn update_group_message_expiration_settings(
&self,
settings: FfiGroupMessageExpirationSettings,
) -> Result<(), GenericError> {
self.inner
.update_group_message_expiration_settings(GroupMessageExpirationSettings::from(
settings,
))
.await?;

Ok(())
}

pub fn group_message_expiration_settings(
&self,
) -> Result<FfiGroupMessageExpirationSettings, GenericError> {
let provider = self.inner.mls_provider()?;
let group_message_expiration_settings =
self.inner.group_message_expiration_settings(&provider)?; // Use `?` to handle the Result

Ok(FfiGroupMessageExpirationSettings::new(
group_message_expiration_settings.expire_from_ms,
group_message_expiration_settings.expire_in_ms,
))
}

pub fn admin_list(&self) -> Result<Vec<String>, GenericError> {
let provider = self.inner.mls_provider()?;
self.inner.admin_list(&provider).map_err(Into::into)
Expand Down Expand Up @@ -2164,10 +2222,11 @@ mod tests {
connect_to_backend, decode_reaction, encode_reaction, get_inbox_id_for_address,
inbox_owner::SigningError, FfiConsent, FfiConsentEntityType, FfiConsentState,
FfiContentType, FfiConversation, FfiConversationCallback, FfiConversationMessageKind,
FfiCreateGroupOptions, FfiDirection, FfiGroupPermissionsOptions, FfiInboxOwner,
FfiListConversationsOptions, FfiListMessagesOptions, FfiMessageWithReactions,
FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType,
FfiReaction, FfiReactionAction, FfiReactionSchema, FfiSubscribeError,
FfiCreateGroupOptions, FfiDirection, FfiGroupMessageExpirationSettings,
FfiGroupPermissionsOptions, FfiInboxOwner, FfiListConversationsOptions,
FfiListMessagesOptions, FfiMessageWithReactions, FfiMetadataField, FfiPermissionPolicy,
FfiPermissionPolicySet, FfiPermissionUpdateType, FfiReaction, FfiReactionAction,
FfiReactionSchema, FfiSubscribeError,
};
use ethers::utils::hex;
use prost::Message;
Expand Down Expand Up @@ -2587,6 +2646,8 @@ mod tests {
}

use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_mls::groups::group_mutable_metadata::{GroupMessageExpirationSettings, MetadataField};
use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies};

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_can_add_wallet_to_inbox() {
Expand Down Expand Up @@ -2859,6 +2920,8 @@ mod tests {
let amal = new_test_client().await;
let bola = new_test_client().await;

let group_message_expiration_settings = FfiGroupMessageExpirationSettings::new(10, 100);

let group = amal
.conversations()
.create_group(
Expand All @@ -2870,8 +2933,10 @@ mod tests {
group_description: Some("group description".to_string()),
group_pinned_frame_url: Some("pinned frame".to_string()),
custom_permission_policy_set: None,
message_expiration_from_ms: None,
message_expiration_ms: None,
message_expiration_from_ms: Some(
group_message_expiration_settings.expire_from_ms,
),
message_expiration_ms: Some(group_message_expiration_settings.expire_in_ms),
},
)
.await
Expand All @@ -2883,8 +2948,26 @@ mod tests {
assert_eq!(group.group_image_url_square().unwrap(), "url");
assert_eq!(group.group_description().unwrap(), "group description");
assert_eq!(group.group_pinned_frame_url().unwrap(), "pinned frame");
assert_eq!(group.group_pinned_frame_url().unwrap(), "pinned frame");
assert_eq!(
group
.group_message_expiration_settings()
.unwrap()
.expire_from_ms,
group_message_expiration_settings.expire_from_ms
);
assert_eq!(
group
.group_message_expiration_settings()
.unwrap()
.expire_in_ms,
group_message_expiration_settings.expire_in_ms
);
}

//test to stream the metadata
//test the policy if someone can change the settings

// Looks like this test might be a separate issue
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_stream_group_messages_for_updates() {
Expand Down Expand Up @@ -4695,6 +4778,93 @@ mod tests {
assert_eq!(bola_conversations.len(), 1);
}

// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
mchenani marked this conversation as resolved.
Show resolved Hide resolved
// async fn test_groups_with_expiration_settings() {
// // Step 1: Setup test clients
// let amal = new_test_client().await;
//
// // Step 2: Create policy set and groups
// let policy_set = Some(PreconfiguredPolicies::AdminsOnly.to_policy_set());
// let group1 = amal
// .inner_client
// .create_group(policy_set.clone(), GroupMetadataOptions::default())
// .expect("Failed to create group1");
// let group2 = amal
// .inner_client
// .create_group(policy_set.clone(), GroupMetadataOptions::default())
// .expect("Failed to create group2");
// let group3 = amal
// .inner_client
// .create_group(policy_set, GroupMetadataOptions::default())
// .expect("Failed to create group3");
//
// // Sync groups
// group1.sync().await.expect("Failed to sync group1");
// group2.sync().await.expect("Failed to sync group2");
// group3.sync().await.expect("Failed to sync group3");
//
// // Step 3: Verify metadata and set expiration for group1
// let group1_metadata = group1
// .mutable_metadata(&group1.mls_provider().expect("Missing MLS provider"))
// .expect("Failed to fetch metadata for group1");
// assert_eq!(group1_metadata.attributes.len(), 4);
// assert!(group1_metadata
// .attributes
// .get(&MetadataField::GroupName.to_string())
// .expect("Missing group name field")
// .is_empty());
//
// // Enable expiration settings for group1
// let expiration_settings = GroupMessageExpirationSettings {
// expire_from_ms: 1000,
// expire_in_ms: 1000,
// };
// group1
// .update_group_message_expiration_settings(expiration_settings)
// .await
// .expect("Failed to update expiration settings for group1");
//
// // Verify only group1 is returned
// let groups_with_expiration = amal
// .conversations()
// .get_groups_with_expiration_enabled()
// .expect("Failed to fetch groups with expiration enabled");
// assert_eq!(groups_with_expiration.len(), 1);
// assert!(groups_with_expiration.contains(&group1.group_id));
//
// // Step 4: Remove expiration settings from group1
// group1
// .remove_group_message_expiration_settings()
// .await
// .expect("Failed to remove expiration settings for group1");
//
// // Verify no groups are returned
// let groups_with_expiration = amal
// .conversations()
// .get_groups_with_expiration_enabled()
// .expect("Failed to fetch groups with expiration enabled");
// assert_eq!(groups_with_expiration.len(), 0);
//
// // Step 5: Enable expiration settings for group1 and group2
// group1
// .update_group_message_expiration_settings(expiration_settings)
// .await
// .expect("Failed to update expiration settings for group1");
// group2
// .update_group_message_expiration_settings(expiration_settings)
// .await
// .expect("Failed to update expiration settings for group2");
//
// // Verify group1 and group2 are returned
// let groups_with_expiration = amal
// .conversations()
// .get_groups_with_expiration_enabled()
// .expect("Failed to fetch groups with expiration enabled");
// assert_eq!(groups_with_expiration.len(), 2);
// assert!(groups_with_expiration.contains(&group1.group_id));
// assert!(groups_with_expiration.contains(&group2.group_id));
// }

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_dm_streaming() {
let alix = new_test_client().await;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS DROP COLUMN message_expire_from_ms;
ALTER TABLE GROUPS DROP COLUMN message_expire_in_ms;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS ADD COLUMN message_expire_from_ms BIGINT NOT NULL DEFAULT 0;
mchenani marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE GROUPS ADD COLUMN message_expire_in_ms BIGINT NOT NULL DEFAULT 0;
85 changes: 85 additions & 0 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{GroupError, MlsGroup};
use crate::groups::group_mutable_metadata::GroupMessageExpirationSettings;
#[cfg(any(test, feature = "test-utils"))]
pub use crate::utils::WorkerHandle;
use crate::{
Expand Down Expand Up @@ -107,6 +108,8 @@ pub enum DeviceSyncError {
Bincode(#[from] bincode::Error),
}

#[derive(Debug, Error)]
pub enum ExpirationWorkerError {}
impl RetryableError for DeviceSyncError {
fn is_retryable(&self) -> bool {
true
Expand Down Expand Up @@ -138,6 +141,19 @@ where
self.set_sync_worker_handle(worker.handle.clone());
worker.spawn_worker();
}

#[instrument(level = "trace", skip_all)]
pub fn start_expired_messages_cleaner_worker(&self) {
let client = self.clone();
tracing::debug!(
inbox_id = client.inbox_id(),
installation_id = hex::encode(client.installation_public_key()),
"starting expired messages cleaner worker"
);

let worker = MessageExpirationWorker::new(client);
worker.spawn_worker();
}
}

pub struct SyncWorker<ApiClient, V> {
Expand Down Expand Up @@ -361,6 +377,75 @@ where
}
}

pub struct MessageExpirationWorker<ApiClient, V> {
mchenani marked this conversation as resolved.
Show resolved Hide resolved
client: Client<ApiClient, V>,
init: OnceCell<()>,
}
impl<ApiClient, V> MessageExpirationWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
fn new(client: Client<ApiClient, V>) -> Self {
Self {
client,
init: OnceCell::new(),
}
}
fn spawn_worker(mut self) {
crate::spawn(None, async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());
while let Err(err) = self.run().await {
tracing::info!("Running worker..");
match err {
DeviceSyncError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
xmtp_common::time::sleep(Duration::from_secs(2)).await;
}
}
}
});
}
}

impl<ApiClient, V> MessageExpirationWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DeviceSyncError> {
let provider = self.client.mls_provider()?;
if let Err(e) = provider.conn_ref().delete_expired_messages() {
tracing::error!(
"Failed to delete expired messages for group: {:?}, error: {:?}",
e
);
}
Ok(())
}

async fn run(&mut self) -> Result<(), DeviceSyncError> {
// Call delete_expired_messages on every iteration
if let Err(err) = self.delete_expired_messages().await {
tracing::error!("Error during deletion of expired messages: {:?}", err);
}
Ok(())
}
}

impl<ApiClient, V> Client<ApiClient, V>
where
ApiClient: XmtpApi,
Expand Down
Loading
Loading