diff --git a/Cargo.lock b/Cargo.lock index 823cb7bf3..074bd1431 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,6 +723,7 @@ dependencies = [ "prometheus-client", "prost", "rand 0.8.5", + "rayon", "redis 0.25.4", "secrecy", "semver", diff --git a/Cargo.toml b/Cargo.toml index ac37e817a..cd10988b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ lettre = { version = "0.11.7", features = ["tokio1", "tokio1-native-tls"] } handlebars = "5.1.2" pin-project = "1.1.5" byteorder = "1.5.0" -rayon = "1.10.0" +rayon.workspace = true [dev-dependencies] @@ -247,6 +247,7 @@ actix-web = { version = "4.5.1", default-features = false, features = [ actix-http = { version = "3.6.0", default-features = false } tokio = { version = "1.36.0", features = ["sync"] } tokio-stream = "0.1.14" +rayon = "1.10.0" futures-util = "0.3.30" bincode = "1.3.3" client-websocket = { path = "libs/client-websocket" } diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index d64ef27ac..b54b69cf1 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -138,6 +138,7 @@ pub trait CollabStorage: Send + Sync + 'static { &self, uid: &i64, queries: Vec, + from_editing_collab: bool, ) -> HashMap; /// Deletes a collaboration from the storage. @@ -249,8 +250,12 @@ where &self, uid: &i64, queries: Vec, + from_editing_collab: bool, ) -> HashMap { - self.as_ref().batch_get_collab(uid, queries).await + self + .as_ref() + .batch_get_collab(uid, queries, from_editing_collab) + .await } async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()> { diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index f9aac2dff..a05cfa3b2 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -15,9 +15,19 @@ path = "src/lib.rs" access-control.workspace = true actix.workspace = true actix-web.workspace = true -actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] } +actix-http = { workspace = true, default-features = false, features = [ + "openssl", + "compress-brotli", + "compress-gzip", +] } actix-web-actors = { version = "4.3" } -app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "bincode_error", "appflowy_ai_error"] } +app-error = { workspace = true, features = [ + "sqlx_error", + "actix_web_error", + "tokio_error", + "bincode_error", + "appflowy_ai_error", +] } authentication.workspace = true brotli.workspace = true dashmap.workspace = true @@ -29,13 +39,24 @@ tracing = "0.1.40" futures-util = "0.3.30" tokio-util = { version = "0.7", features = ["codec"] } tokio-stream = { version = "0.1.14", features = ["sync"] } -tokio = { workspace = true, features = ["net", "sync", "macros", "rt-multi-thread"] } +tokio = { workspace = true, features = [ + "net", + "sync", + "macros", + "rt-multi-thread", +] } async-trait = "0.1.77" prost.workspace = true serde.workspace = true serde_json.workspace = true serde_repr.workspace = true -sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] } +sqlx = { workspace = true, default-features = false, features = [ + "runtime-tokio-rustls", + "macros", + "postgres", + "uuid", + "chrono", +] } thiserror = "1.0.56" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } anyhow = "1" @@ -66,6 +87,7 @@ lazy_static = "1.4.0" itertools = "0.12.0" validator = "0.16.1" workspace-access.workspace = true +rayon.workspace = true [dev-dependencies] rand = "0.8.5" diff --git a/services/appflowy-collaborate/src/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs index 3c534d60b..002faaa8e 100644 --- a/services/appflowy-collaborate/src/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -7,6 +7,7 @@ use collab::entity::EncodedCollab; use collab_entity::CollabType; use collab_rt_entity::ClientCollabMessage; use itertools::{Either, Itertools}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sqlx::Transaction; use tokio::time::timeout; use tracing::warn; @@ -158,6 +159,40 @@ where } } + async fn batch_get_encode_collab_from_editing( + &self, + object_ids: Vec, + ) -> HashMap { + let (ret, rx) = tokio::sync::oneshot::channel(); + let timeout_duration = Duration::from_secs(10); + + // Attempt to send the command to the realtime server + if let Err(err) = self + .rt_cmd_sender + .send(CollaborationCommand::BatchGetEncodeCollab { object_ids, ret }) + .await + { + error!( + "Failed to send get encode collab command to realtime server: {}", + err + ); + return HashMap::new(); + } + + // Await the response from the realtime server with a timeout + match timeout(timeout_duration, rx).await { + Ok(Ok(batch_encoded_collab)) => batch_encoded_collab, + Ok(Err(err)) => { + error!("Failed to get encode collab from realtime server: {}", err); + HashMap::new() + }, + Err(_) => { + error!("Timeout waiting for encode collab from realtime server"); + HashMap::new() + }, + } + } + async fn queue_insert_collab( &self, workspace_id: &str, @@ -326,6 +361,7 @@ where &self, _uid: &i64, queries: Vec, + from_editing_collab: bool, ) -> HashMap { // Partition queries based on validation into valid queries and errors (with associated error messages). let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) = @@ -340,8 +376,48 @@ where }, )), }); + let cache_queries = if from_editing_collab { + let editing_queries = valid_queries.clone(); + let editing_results = self + .batch_get_encode_collab_from_editing( + editing_queries + .iter() + .map(|q| q.object_id.clone()) + .collect(), + ) + .await; + let editing_query_collab_results: HashMap = + tokio::task::spawn_blocking(move || { + let par_iter = editing_results.into_par_iter(); + par_iter + .map(|(object_id, encoded_collab)| { + let encoding_result = encoded_collab.encode_to_bytes(); + let query_collab_result = match encoding_result { + Ok(encoded_collab_bytes) => QueryCollabResult::Success { + encode_collab_v1: encoded_collab_bytes, + }, + Err(err) => QueryCollabResult::Failed { + error: err.to_string(), + }, + }; + + (object_id.clone(), query_collab_result) + }) + .collect() + }) + .await + .unwrap(); + let editing_object_ids: Vec = editing_query_collab_results.keys().cloned().collect(); + results.extend(editing_query_collab_results); + valid_queries + .into_iter() + .filter(|q| !editing_object_ids.contains(&q.object_id)) + .collect() + } else { + valid_queries + }; - results.extend(self.cache.batch_get_encode_collab(valid_queries).await); + results.extend(self.cache.batch_get_encode_collab(cache_queries).await); results } diff --git a/services/appflowy-collaborate/src/command.rs b/services/appflowy-collaborate/src/command.rs index 4478f606c..eb9909b38 100644 --- a/services/appflowy-collaborate/src/command.rs +++ b/services/appflowy-collaborate/src/command.rs @@ -1,22 +1,36 @@ use crate::{ error::RealtimeError, - group::cmd::{GroupCommand, GroupCommandSender}, + group::{ + cmd::{GroupCommand, GroupCommandSender}, + manager::GroupManager, + }, }; +use access_control::collab::RealtimeAccessControl; use collab::entity::EncodedCollab; use collab_rt_entity::ClientCollabMessage; use dashmap::DashMap; -use std::sync::Arc; +use database::collab::CollabStorage; +use futures::StreamExt; +use std::{ + collections::HashMap, + sync::{Arc, Weak}, +}; use tracing::error; pub type CLCommandSender = tokio::sync::mpsc::Sender; pub type CLCommandReceiver = tokio::sync::mpsc::Receiver; pub type EncodeCollabSender = tokio::sync::oneshot::Sender>; +pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender>; pub enum CollaborationCommand { GetEncodeCollab { object_id: String, ret: EncodeCollabSender, }, + BatchGetEncodeCollab { + object_ids: Vec, + ret: BatchEncodeCollabSender, + }, ServerSendCollabMessage { object_id: String, collab_messages: Vec, @@ -24,10 +38,14 @@ pub enum CollaborationCommand { }, } -pub(crate) fn spawn_collaboration_command( +pub(crate) fn spawn_collaboration_command( mut command_recv: CLCommandReceiver, group_sender_by_object_id: &Arc>, -) { + weak_groups: Weak>, +) where + S: CollabStorage, + AC: RealtimeAccessControl, +{ let group_sender_by_object_id = group_sender_by_object_id.clone(); tokio::spawn(async move { while let Some(cmd) = command_recv.recv().await { @@ -50,6 +68,35 @@ pub(crate) fn spawn_collaboration_command( }, } }, + CollaborationCommand::BatchGetEncodeCollab { object_ids, ret } => { + if let Some(group_manager) = weak_groups.upgrade() { + let tasks = futures::stream::iter(object_ids) + .map(|object_id| { + let cloned_group_manager = group_manager.clone(); + tokio::task::spawn(async move { + let group = cloned_group_manager.get_group(&object_id).await; + if let Some(group) = group { + (object_id, group.encode_collab().await.ok()) + } else { + (object_id, None) + } + }) + }) + .collect::>() + .await; + + let mut outputs: HashMap = HashMap::new(); + for task in tasks { + let result = task.await; + if let Ok((object_id, Some(encoded_collab))) = result { + outputs.insert(object_id, encoded_collab); + } + } + let _ = ret.send(outputs); + } else { + let _ = ret.send(HashMap::new()); + } + }, CollaborationCommand::ServerSendCollabMessage { object_id, collab_messages, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index ca1f09f1a..f973d6a56 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -89,7 +89,11 @@ where spawn_period_check_inactive_group(Arc::downgrade(&group_manager), &group_sender_by_object_id); - spawn_collaboration_command(command_recv, &group_sender_by_object_id); + spawn_collaboration_command( + command_recv, + &group_sender_by_object_id, + Arc::downgrade(&group_manager), + ); spawn_metrics(metrics.clone(), storage.clone()); diff --git a/src/api/workspace.rs b/src/api/workspace.rs index d7e371a08..7094f76e5 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -885,7 +885,7 @@ async fn batch_get_collab_handler( let result = BatchQueryCollabResult( state .collab_access_control_storage - .batch_get_collab(&uid, payload.into_inner().0) + .batch_get_collab(&uid, payload.into_inner().0, false) .await, ); Ok(Json(AppResponse::Ok().with_data(result))) diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index 9f980d410..b999a17d2 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -189,7 +189,7 @@ async fn get_page_collab_data_for_database( }) .collect(); let row_query_collab_results = collab_access_control_storage - .batch_get_collab(&uid, queries) + .batch_get_collab(&uid, queries, true) .await; let row_data = tokio::task::spawn_blocking(move || { let row_collabs: HashMap> = row_query_collab_results