Skip to content

Commit

Permalink
feat(groups): add a worker to delete expired messages (#1503)
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani authored Jan 27, 2025
1 parent 6de768d commit 869a414
Show file tree
Hide file tree
Showing 20 changed files with 813 additions and 118 deletions.
268 changes: 231 additions & 37 deletions bindings_ffi/src/mls.rs

Large diffs are not rendered by default.

29 changes: 28 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::{
streams::StreamCloser,
ErrorWrapper,
};

use prost::Message as ProstMessage;
use xmtp_mls::groups::group_mutable_metadata::MessageDisappearingSettings as XmtpConversationMessageDisappearingSettings;

use napi_derive::napi;

Expand All @@ -38,6 +38,33 @@ pub struct GroupMetadata {
inner: XmtpGroupMetadata,
}

/// Settings for disappearing messages in a conversation.
///
/// # Fields
///
/// * `from_ns` - The timestamp (in nanoseconds) from when messages should be tracked for deletion.
/// * `in_ns` - The duration (in nanoseconds) after which tracked messages will be deleted.
#[napi(object)]
#[derive(Clone)]
pub struct MessageDisappearingSettings {
pub from_ns: i64,
pub in_ns: i64,
}

#[napi]
impl MessageDisappearingSettings {
#[napi]
pub fn new(from_ns: i64, in_ns: i64) -> Self {
Self { from_ns, in_ns }
}
}

impl From<MessageDisappearingSettings> for XmtpConversationMessageDisappearingSettings {
fn from(value: MessageDisappearingSettings) -> Self {
XmtpConversationMessageDisappearingSettings::new(value.from_ns, value.in_ns)
}
}

#[napi]
impl GroupMetadata {
#[napi]
Expand Down
12 changes: 6 additions & 6 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType;
use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState;
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::conversation::MessageDisappearingSettings;
use crate::message::Message;
use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet};
use crate::ErrorWrapper;
Expand Down Expand Up @@ -122,8 +123,7 @@ pub struct CreateGroupOptions {
pub group_description: Option<String>,
pub group_pinned_frame_url: Option<String>,
pub custom_permission_policy_set: Option<PermissionPolicySet>,
pub message_expiration_from_ms: Option<i64>,
pub message_expiration_ms: Option<i64>,
pub message_disappearing_settings: Option<MessageDisappearingSettings>,
}

impl CreateGroupOptions {
Expand All @@ -133,8 +133,9 @@ impl CreateGroupOptions {
image_url_square: self.group_image_url_square,
description: self.group_description,
pinned_frame_url: self.group_pinned_frame_url,
message_expiration_from_ms: self.message_expiration_from_ms,
message_expiration_ms: self.message_expiration_ms,
message_disappearing_settings: self
.message_disappearing_settings
.map(|settings| settings.into()),
}
}
}
Expand Down Expand Up @@ -163,8 +164,7 @@ impl Conversations {
group_description: None,
group_pinned_frame_url: None,
custom_permission_policy_set: None,
message_expiration_from_ms: None,
message_expiration_ms: None,
message_disappearing_settings: None,
});

if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions {
Expand Down
10 changes: 5 additions & 5 deletions bindings_node/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct PermissionPolicySet {
pub update_group_description_policy: PermissionPolicy,
pub update_group_image_url_square_policy: PermissionPolicy,
pub update_group_pinned_frame_url_policy: PermissionPolicy,
pub update_message_expiration_ms_policy: PermissionPolicy,
pub update_message_disappearing_policy: PermissionPolicy,
}

impl From<PreconfiguredPolicies> for GroupPermissionsOptions {
Expand Down Expand Up @@ -216,8 +216,8 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
XmtpMetadataField::MessageExpirationMillis.as_str(),
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
}
Expand Down Expand Up @@ -246,8 +246,8 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
policy_set.update_group_pinned_frame_url_policy.try_into()?,
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageExpirationMillis.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
Expand Down
16 changes: 8 additions & 8 deletions bindings_node/test/Conversations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 0,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 0,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})
expect(group.addedByInboxId()).toBe(client1.inboxId())
expect((await group.findMessages()).length).toBe(1)
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 1,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 3,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
},
})
expect(group).toBeDefined()
Expand All @@ -120,7 +120,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 1,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 3,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})
})

Expand All @@ -142,7 +142,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 0,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 0,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})

await group.updatePermissionPolicy(
Expand All @@ -159,7 +159,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 0,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 0,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})

await group.updatePermissionPolicy(
Expand All @@ -177,7 +177,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 0,
updateGroupImageUrlSquarePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 0,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})
})

Expand All @@ -204,7 +204,7 @@ describe('Conversations', () => {
updateGroupImageUrlSquarePolicy: 0,
updateGroupNamePolicy: 0,
updateGroupPinnedFrameUrlPolicy: 0,
updateMessageExpirationMsPolicy: 0,
updateMessageDisappearingPolicy: 0,
})
expect(group.addedByInboxId()).toBe(client1.inboxId())
expect((await group.findMessages()).length).toBe(0)
Expand Down Expand Up @@ -342,7 +342,7 @@ describe('Conversations', () => {
updateGroupDescriptionPolicy: 2,
updateGroupImageUrlSquarePolicy: 2,
updateGroupPinnedFrameUrlPolicy: 2,
updateMessageExpirationMsPolicy: 2,
updateMessageDisappearingPolicy: 2,
})

const groupWithDescription = await client1
Expand Down
17 changes: 17 additions & 0 deletions bindings_wasm/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,29 @@ use xmtp_mls::storage::group_message::{GroupMessageKind as XmtpGroupMessageKind,
use xmtp_proto::xmtp::mls::message_contents::EncodedContent as XmtpEncodedContent;

use prost::Message as ProstMessage;
use xmtp_mls::groups::group_mutable_metadata::MessageDisappearingSettings as XmtpMessageDisappearingSettings;

#[wasm_bindgen]
pub struct GroupMetadata {
inner: XmtpGroupMetadata,
}

#[wasm_bindgen]
#[derive(Clone)]
pub struct MessageDisappearingSettings {
#[allow(dead_code)]
inner: XmtpMessageDisappearingSettings,
}

impl From<MessageDisappearingSettings> for XmtpMessageDisappearingSettings {
fn from(value: MessageDisappearingSettings) -> Self {
Self {
from_ns: value.inner.from_ns,
in_ns: value.inner.in_ns,
}
}
}

#[wasm_bindgen]
impl GroupMetadata {
#[wasm_bindgen(js_name = creatorInboxId)]
Expand Down
21 changes: 9 additions & 12 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType;
use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState;
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::conversation::MessageDisappearingSettings;
use crate::messages::Message;
use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet};
use crate::{client::RustXmtpClient, conversation::Conversation};
Expand Down Expand Up @@ -130,10 +131,8 @@ pub struct CreateGroupOptions {
pub group_pinned_frame_url: Option<String>,
#[wasm_bindgen(js_name = customPermissionPolicySet)]
pub custom_permission_policy_set: Option<PermissionPolicySet>,
#[wasm_bindgen(js_name = messageExpirationFromMillis)]
pub message_expiration_from_ms: Option<i64>,
#[wasm_bindgen(js_name = messageExpirationMillis)]
pub message_expiration_ms: Option<i64>,
#[wasm_bindgen(js_name = messageDisappearingSettings)]
pub message_disappearing_settings: Option<MessageDisappearingSettings>,
}

#[wasm_bindgen]
Expand All @@ -147,8 +146,7 @@ impl CreateGroupOptions {
group_description: Option<String>,
group_pinned_frame_url: Option<String>,
custom_permission_policy_set: Option<PermissionPolicySet>,
message_expiration_from_ms: Option<i64>,
message_expiration_ms: Option<i64>,
message_disappearing_settings: Option<MessageDisappearingSettings>,
) -> Self {
Self {
permissions,
Expand All @@ -157,8 +155,7 @@ impl CreateGroupOptions {
group_description,
group_pinned_frame_url,
custom_permission_policy_set,
message_expiration_from_ms,
message_expiration_ms,
message_disappearing_settings,
}
}
}
Expand All @@ -170,8 +167,9 @@ impl CreateGroupOptions {
image_url_square: self.group_image_url_square,
description: self.group_description,
pinned_frame_url: self.group_pinned_frame_url,
message_expiration_from_ms: self.message_expiration_from_ms,
message_expiration_ms: self.message_expiration_ms,
message_disappearing_settings: self
.message_disappearing_settings
.map(|settings| settings.into()),
}
}
}
Expand Down Expand Up @@ -218,8 +216,7 @@ impl Conversations {
group_description: None,
group_pinned_frame_url: None,
custom_permission_policy_set: None,
message_expiration_from_ms: None,
message_expiration_ms: None,
message_disappearing_settings: None,
});

if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions {
Expand Down
20 changes: 12 additions & 8 deletions bindings_wasm/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ pub struct PermissionPolicySet {
pub update_group_image_url_square_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateGroupPinnedFrameUrlPolicy)]
pub update_group_pinned_frame_url_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageExpirationPolicy)]
pub update_message_expiration_ms_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageDisappearingPolicy)]
pub update_message_disappearing_policy: PermissionPolicy,
}

#[wasm_bindgen]
Expand All @@ -187,7 +187,7 @@ impl PermissionPolicySet {
update_group_description_policy: PermissionPolicy,
update_group_image_url_square_policy: PermissionPolicy,
update_group_pinned_frame_url_policy: PermissionPolicy,
update_message_expiration_ms_policy: PermissionPolicy,
update_message_disappearing_policy: PermissionPolicy,
) -> Self {
Self {
add_member_policy,
Expand All @@ -198,7 +198,7 @@ impl PermissionPolicySet {
update_group_description_policy,
update_group_image_url_square_policy,
update_group_pinned_frame_url_policy,
update_message_expiration_ms_policy,
update_message_disappearing_policy,
}
}
}
Expand Down Expand Up @@ -257,8 +257,8 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
XmtpMetadataField::MessageExpirationMillis.as_str(),
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
}
Expand All @@ -285,8 +285,8 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
policy_set.update_group_pinned_frame_url_policy.try_into()?,
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageExpirationMillis.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
Expand All @@ -306,6 +306,8 @@ pub enum MetadataField {
Description,
ImageUrlSquare,
PinnedFrameUrl,
MessageExpirationFromMS,
MessageExpirationMS,
}

impl From<&MetadataField> for XmtpMetadataField {
Expand All @@ -315,6 +317,8 @@ impl From<&MetadataField> for XmtpMetadataField {
MetadataField::Description => XmtpMetadataField::Description,
MetadataField::ImageUrlSquare => XmtpMetadataField::GroupImageUrlSquare,
MetadataField::PinnedFrameUrl => XmtpMetadataField::GroupPinnedFrameUrl,
MetadataField::MessageExpirationFromMS => XmtpMetadataField::MessageDisappearFromNS,
MetadataField::MessageExpirationMS => XmtpMetadataField::MessageDisappearInNS,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS DROP COLUMN message_disappear_from_ns;
ALTER TABLE GROUPS DROP COLUMN message_disappear_in_ns;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS ADD COLUMN message_disappear_from_ns BIGINT;
ALTER TABLE GROUPS ADD COLUMN message_disappear_in_ns BIGINT;
14 changes: 14 additions & 0 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{GroupError, MlsGroup};
use crate::groups::disappearing_messages::DisappearingMessagesCleanerWorker;
#[cfg(any(test, feature = "test-utils"))]
pub use crate::utils::WorkerHandle;
use crate::{
Expand Down Expand Up @@ -138,6 +139,19 @@ where
self.set_sync_worker_handle(worker.handle.clone());
worker.spawn_worker();
}

#[instrument(level = "trace", skip_all)]
pub fn start_disappearing_messages_cleaner_worker(&self) {
let client = self.clone();
tracing::trace!(
inbox_id = client.inbox_id(),
installation_id = hex::encode(client.installation_public_key()),
"starting expired messages cleaner worker"
);

let worker = DisappearingMessagesCleanerWorker::new(client);
worker.spawn_worker();
}
}

pub struct SyncWorker<ApiClient, V> {
Expand Down
Loading

0 comments on commit 869a414

Please sign in to comment.