Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(groups): add a worker to delete expired messages #1503

Merged
merged 31 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
843faee
wip
mchenani Jan 14, 2025
8bd7422
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
9e84a95
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 14, 2025
5a994fd
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
b9be121
add group message expiration setting to db
mchenani Jan 16, 2025
255c6f8
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 16, 2025
22e3eff
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
ecd3b92
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 17, 2025
8e7835b
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 20, 2025
1dcb898
rename to ConversationMessageDisappearingSettings
mchenani Jan 20, 2025
d62cac0
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
354b7e6
wip
mchenani Jan 21, 2025
1b11016
extract disappearing messages worker to a separate file
mchenani Jan 21, 2025
508c792
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 21, 2025
8951e87
store metadatas into db
mchenani Jan 23, 2025
de91ebf
remove comments
mchenani Jan 23, 2025
62e2aae
wip
mchenani Jan 23, 2025
c3a3f91
add test
mchenani Jan 24, 2025
6f54733
fix tests
mchenani Jan 24, 2025
0825f6b
fix tests
mchenani Jan 24, 2025
3911e8f
address comments
mchenani Jan 24, 2025
736fc06
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 24, 2025
6bf97ed
fix clippy
mchenani Jan 27, 2025
bf2b894
fix clippy
mchenani Jan 27, 2025
e6136cb
fix clippy
mchenani Jan 27, 2025
c783def
Merge branch 'main' into mc/disappearning-messages-worker
mchenani Jan 27, 2025
1a527a9
fix napi and address comments
mchenani Jan 27, 2025
fbdd11f
add comments to messageDisappearingSettings
mchenani Jan 27, 2025
aeccd04
fix missing mappings
mchenani Jan 27, 2025
9ec0034
fix bindings_node
mchenani Jan 27, 2025
94f5b34
fix bindings_node
mchenani Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 223 additions & 35 deletions bindings_ffi/src/mls.rs

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::{
streams::StreamCloser,
ErrorWrapper,
};

use prost::Message as ProstMessage;
use xmtp_mls::groups::group_mutable_metadata::ConversationMessageDisappearingSettings as XmtpConversationMessageDisappearingSettings;

use napi_derive::napi;

Expand All @@ -38,6 +38,28 @@ pub struct GroupMetadata {
inner: XmtpGroupMetadata,
}

#[napi(object)]
#[derive(Clone)]
pub struct ConversationMessageDisappearingSettings {
#[napi]
pub inner: XmtpConversationMessageDisappearingSettings,
}

#[napi]
impl ConversationMessageDisappearingSettings {
#[napi]
pub fn new(from_ns: i64, in_ns: i64) -> Self {
let inner = XmtpConversationMessageDisappearingSettings { from_ns, in_ns };
Self { inner }
}
}

impl From<ConversationMessageDisappearingSettings> for XmtpConversationMessageDisappearingSettings {
fn from(value: ConversationMessageDisappearingSettings) -> Self {
XmtpConversationMessageDisappearingSettings::new(value.inner.from_ns, value.inner.in_ns)
}
}

#[napi]
impl GroupMetadata {
#[napi]
Expand Down
12 changes: 6 additions & 6 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType;
use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState;
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::conversation::ConversationMessageDisappearingSettings;
use crate::message::Message;
use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet};
use crate::ErrorWrapper;
Expand Down Expand Up @@ -122,8 +123,7 @@ pub struct CreateGroupOptions {
pub group_description: Option<String>,
pub group_pinned_frame_url: Option<String>,
pub custom_permission_policy_set: Option<PermissionPolicySet>,
pub message_expiration_from_ms: Option<i64>,
pub message_expiration_ms: Option<i64>,
pub message_disappearing_settings: Option<ConversationMessageDisappearingSettings>,
}

impl CreateGroupOptions {
Expand All @@ -133,8 +133,9 @@ impl CreateGroupOptions {
image_url_square: self.group_image_url_square,
description: self.group_description,
pinned_frame_url: self.group_pinned_frame_url,
message_expiration_from_ms: self.message_expiration_from_ms,
message_expiration_ms: self.message_expiration_ms,
message_disappearing_settings: self
.message_disappearing_settings
.map(|settings| settings.into()),
}
}
}
Expand Down Expand Up @@ -163,8 +164,7 @@ impl Conversations {
group_description: None,
group_pinned_frame_url: None,
custom_permission_policy_set: None,
message_expiration_from_ms: None,
message_expiration_ms: None,
message_disappearing_settings: None,
});

if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions {
Expand Down
10 changes: 5 additions & 5 deletions bindings_node/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct PermissionPolicySet {
pub update_group_description_policy: PermissionPolicy,
pub update_group_image_url_square_policy: PermissionPolicy,
pub update_group_pinned_frame_url_policy: PermissionPolicy,
pub update_message_expiration_ms_policy: PermissionPolicy,
pub update_message_disappearing_policy: PermissionPolicy,
}

impl From<PreconfiguredPolicies> for GroupPermissionsOptions {
Expand Down Expand Up @@ -216,8 +216,8 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
XmtpMetadataField::MessageExpirationMillis.as_str(),
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
}
Expand Down Expand Up @@ -246,8 +246,8 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
policy_set.update_group_pinned_frame_url_policy.try_into()?,
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageExpirationMillis.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
Expand Down
7 changes: 7 additions & 0 deletions bindings_wasm/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ use xmtp_mls::storage::group_message::{GroupMessageKind as XmtpGroupMessageKind,
use xmtp_proto::xmtp::mls::message_contents::EncodedContent as XmtpEncodedContent;

use prost::Message as ProstMessage;
use xmtp_mls::groups::group_mutable_metadata::ConversationMessageDisappearingSettings as XmtpGroupMessageDisappearingSettings;

#[wasm_bindgen]
pub struct GroupMetadata {
inner: XmtpGroupMetadata,
}

#[wasm_bindgen]
#[derive(Clone)]
pub struct ConversationMessageDisappearingSettings {
inner: XmtpGroupMessageDisappearingSettings,
}

#[wasm_bindgen]
impl GroupMetadata {
#[wasm_bindgen(js_name = creatorInboxId)]
Expand Down
19 changes: 7 additions & 12 deletions bindings_wasm/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use xmtp_mls::storage::group::ConversationType as XmtpConversationType;
use xmtp_mls::storage::group::GroupMembershipState as XmtpGroupMembershipState;
use xmtp_mls::storage::group::GroupQueryArgs;

use crate::conversation::ConversationMessageDisappearingSettings;
use crate::messages::Message;
use crate::permissions::{GroupPermissionsOptions, PermissionPolicySet};
use crate::{client::RustXmtpClient, conversation::Conversation};
Expand Down Expand Up @@ -130,10 +131,8 @@ pub struct CreateGroupOptions {
pub group_pinned_frame_url: Option<String>,
#[wasm_bindgen(js_name = customPermissionPolicySet)]
pub custom_permission_policy_set: Option<PermissionPolicySet>,
#[wasm_bindgen(js_name = messageExpirationFromMillis)]
pub message_expiration_from_ms: Option<i64>,
#[wasm_bindgen(js_name = messageExpirationMillis)]
pub message_expiration_ms: Option<i64>,
#[wasm_bindgen(js_name = messageDisappearingSettings)]
pub message_disappearing_settings: Option<ConversationMessageDisappearingSettings>,
}

#[wasm_bindgen]
Expand All @@ -147,8 +146,7 @@ impl CreateGroupOptions {
group_description: Option<String>,
group_pinned_frame_url: Option<String>,
custom_permission_policy_set: Option<PermissionPolicySet>,
message_expiration_from_ms: Option<i64>,
message_expiration_ms: Option<i64>,
message_disappearing_settings: Option<ConversationMessageDisappearingSettings>,
) -> Self {
Self {
permissions,
Expand All @@ -157,8 +155,7 @@ impl CreateGroupOptions {
group_description,
group_pinned_frame_url,
custom_permission_policy_set,
message_expiration_from_ms,
message_expiration_ms,
message_disappearing_settings,
}
}
}
Expand All @@ -170,8 +167,7 @@ impl CreateGroupOptions {
image_url_square: self.group_image_url_square,
description: self.group_description,
pinned_frame_url: self.group_pinned_frame_url,
message_expiration_from_ms: self.message_expiration_from_ms,
message_expiration_ms: self.message_expiration_ms,
message_disappearing_settings: None, // todo: fix mapping,
mchenani marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -218,8 +214,7 @@ impl Conversations {
group_description: None,
group_pinned_frame_url: None,
custom_permission_policy_set: None,
message_expiration_from_ms: None,
message_expiration_ms: None,
message_disappearing_settings: None,
});

if let Some(GroupPermissionsOptions::CustomPolicy) = options.permissions {
Expand Down
20 changes: 12 additions & 8 deletions bindings_wasm/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ pub struct PermissionPolicySet {
pub update_group_image_url_square_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateGroupPinnedFrameUrlPolicy)]
pub update_group_pinned_frame_url_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageExpirationPolicy)]
pub update_message_expiration_ms_policy: PermissionPolicy,
#[wasm_bindgen(js_name = updateMessageDisappearingPolicy)]
pub update_message_disappearing_policy: PermissionPolicy,
}

#[wasm_bindgen]
Expand All @@ -187,7 +187,7 @@ impl PermissionPolicySet {
update_group_description_policy: PermissionPolicy,
update_group_image_url_square_policy: PermissionPolicy,
update_group_pinned_frame_url_policy: PermissionPolicy,
update_message_expiration_ms_policy: PermissionPolicy,
update_message_disappearing_policy: PermissionPolicy,
) -> Self {
Self {
add_member_policy,
Expand All @@ -198,7 +198,7 @@ impl PermissionPolicySet {
update_group_description_policy,
update_group_image_url_square_policy,
update_group_pinned_frame_url_policy,
update_message_expiration_ms_policy,
update_message_disappearing_policy,
}
}
}
Expand Down Expand Up @@ -257,8 +257,8 @@ impl GroupPermissions {
update_group_pinned_frame_url_policy: get_policy(
XmtpMetadataField::GroupPinnedFrameUrl.as_str(),
),
update_message_expiration_ms_policy: get_policy(
XmtpMetadataField::MessageExpirationMillis.as_str(),
update_message_disappearing_policy: get_policy(
XmtpMetadataField::MessageDisappearInNS.as_str(),
),
})
}
Expand All @@ -285,8 +285,8 @@ impl TryFrom<PermissionPolicySet> for PolicySet {
policy_set.update_group_pinned_frame_url_policy.try_into()?,
);
metadata_permissions_map.insert(
XmtpMetadataField::MessageExpirationMillis.to_string(),
policy_set.update_message_expiration_ms_policy.try_into()?,
XmtpMetadataField::MessageDisappearInNS.to_string(),
policy_set.update_message_disappearing_policy.try_into()?,
);

Ok(PolicySet {
Expand All @@ -306,6 +306,8 @@ pub enum MetadataField {
Description,
ImageUrlSquare,
PinnedFrameUrl,
MessageExpirationFromMS,
MessageExpirationMS,
}

impl From<&MetadataField> for XmtpMetadataField {
Expand All @@ -315,6 +317,8 @@ impl From<&MetadataField> for XmtpMetadataField {
MetadataField::Description => XmtpMetadataField::Description,
MetadataField::ImageUrlSquare => XmtpMetadataField::GroupImageUrlSquare,
MetadataField::PinnedFrameUrl => XmtpMetadataField::GroupPinnedFrameUrl,
MetadataField::MessageExpirationFromMS => XmtpMetadataField::MessageDisappearFromNS,
MetadataField::MessageExpirationMS => XmtpMetadataField::MessageDisappearInNS,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS DROP COLUMN message_disappear_from_ns;
ALTER TABLE GROUPS DROP COLUMN message_disappear_in_ns;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE GROUPS ADD COLUMN message_disappear_from_ns BIGINT;
ALTER TABLE GROUPS ADD COLUMN message_disappear_in_ns BIGINT;
14 changes: 14 additions & 0 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{GroupError, MlsGroup};
use crate::groups::disappearing_messages::DisappearingMessagesCleanerWorker;
#[cfg(any(test, feature = "test-utils"))]
pub use crate::utils::WorkerHandle;
use crate::{
Expand Down Expand Up @@ -138,6 +139,19 @@ where
self.set_sync_worker_handle(worker.handle.clone());
worker.spawn_worker();
}

#[instrument(level = "trace", skip_all)]
pub fn start_disappearing_messages_cleaner_worker(&self) {
let client = self.clone();
tracing::trace!(
inbox_id = client.inbox_id(),
installation_id = hex::encode(client.installation_public_key()),
"starting expired messages cleaner worker"
);

let worker = DisappearingMessagesCleanerWorker::new(client);
worker.spawn_worker();
}
}

pub struct SyncWorker<ApiClient, V> {
Expand Down
85 changes: 85 additions & 0 deletions xmtp_mls/src/groups/disappearing_messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::client::ClientError;
use crate::storage::StorageError;
use crate::Client;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::OnceCell;
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::api_client::trait_impls::XmtpApi;

#[derive(Debug, Error)]
pub enum DisappearingMessagesCleanerError {
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("client error: {0}")]
Client(#[from] ClientError),
}

pub struct DisappearingMessagesCleanerWorker<ApiClient, V> {
client: Client<ApiClient, V>,
init: OnceCell<()>,
}
impl<ApiClient, V> DisappearingMessagesCleanerWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
pub fn new(client: Client<ApiClient, V>) -> Self {
Self {
client,
init: OnceCell::new(),
}
}
pub(crate) fn spawn_worker(mut self) {
crate::spawn(None, async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());
while let Err(err) = self.run().await {
tracing::info!("Running worker..");
match err {
DisappearingMessagesCleanerError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
xmtp_common::time::sleep(Duration::from_secs(2)).await;
mchenani marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
});
}
}

impl<ApiClient, V> DisappearingMessagesCleanerWorker<ApiClient, V>
where
ApiClient: XmtpApi + Send + Sync + 'static,
V: SmartContractSignatureVerifier + Send + Sync + 'static,
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
let provider = self.client.mls_provider()?;
match provider.conn_ref().delete_expired_messages() {
Ok(deleted_count) => {
tracing::info!("Successfully deleted {} expired messages", deleted_count);
}
Err(e) => {
tracing::error!("Failed to delete expired messages, error: {:?}", e);
}
}
Ok(())
}
async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
if let Err(err) = self.delete_expired_messages().await {
tracing::error!("Error during deletion of expired messages: {:?}", err);
}
Ok(())
}
}
Loading
Loading