Skip to content

Commit

Permalink
feat: support fetching encoded collab in batch from memory (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng authored Oct 2, 2024
1 parent 03fdcf4 commit 1173232
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ lettre = { version = "0.11.7", features = ["tokio1", "tokio1-native-tls"] }
handlebars = "5.1.2"
pin-project = "1.1.5"
byteorder = "1.5.0"
rayon = "1.10.0"
rayon.workspace = true


[dev-dependencies]
Expand Down Expand Up @@ -247,6 +247,7 @@ actix-web = { version = "4.5.1", default-features = false, features = [
actix-http = { version = "3.6.0", default-features = false }
tokio = { version = "1.36.0", features = ["sync"] }
tokio-stream = "0.1.14"
rayon = "1.10.0"
futures-util = "0.3.30"
bincode = "1.3.3"
client-websocket = { path = "libs/client-websocket" }
Expand Down
7 changes: 6 additions & 1 deletion libs/database/src/collab/collab_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub trait CollabStorage: Send + Sync + 'static {
&self,
uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult>;

/// Deletes a collaboration from the storage.
Expand Down Expand Up @@ -249,8 +250,12 @@ where
&self,
uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult> {
self.as_ref().batch_get_collab(uid, queries).await
self
.as_ref()
.batch_get_collab(uid, queries, from_editing_collab)
.await
}

async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()> {
Expand Down
30 changes: 26 additions & 4 deletions services/appflowy-collaborate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ path = "src/lib.rs"
access-control.workspace = true
actix.workspace = true
actix-web.workspace = true
actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] }
actix-http = { workspace = true, default-features = false, features = [
"openssl",
"compress-brotli",
"compress-gzip",
] }
actix-web-actors = { version = "4.3" }
app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "bincode_error", "appflowy_ai_error"] }
app-error = { workspace = true, features = [
"sqlx_error",
"actix_web_error",
"tokio_error",
"bincode_error",
"appflowy_ai_error",
] }
authentication.workspace = true
brotli.workspace = true
dashmap.workspace = true
Expand All @@ -29,13 +39,24 @@ tracing = "0.1.40"
futures-util = "0.3.30"
tokio-util = { version = "0.7", features = ["codec"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio = { workspace = true, features = ["net", "sync", "macros", "rt-multi-thread"] }
tokio = { workspace = true, features = [
"net",
"sync",
"macros",
"rt-multi-thread",
] }
async-trait = "0.1.77"
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_repr.workspace = true
sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] }
sqlx = { workspace = true, default-features = false, features = [
"runtime-tokio-rustls",
"macros",
"postgres",
"uuid",
"chrono",
] }
thiserror = "1.0.56"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
anyhow = "1"
Expand Down Expand Up @@ -66,6 +87,7 @@ lazy_static = "1.4.0"
itertools = "0.12.0"
validator = "0.16.1"
workspace-access.workspace = true
rayon.workspace = true

[dev-dependencies]
rand = "0.8.5"
Expand Down
78 changes: 77 additions & 1 deletion services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use collab_rt_entity::ClientCollabMessage;
use itertools::{Either, Itertools};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sqlx::Transaction;
use tokio::time::timeout;
use tracing::warn;
Expand Down Expand Up @@ -158,6 +159,40 @@ where
}
}

async fn batch_get_encode_collab_from_editing(
&self,
object_ids: Vec<String>,
) -> HashMap<String, EncodedCollab> {
let (ret, rx) = tokio::sync::oneshot::channel();
let timeout_duration = Duration::from_secs(10);

// Attempt to send the command to the realtime server
if let Err(err) = self
.rt_cmd_sender
.send(CollaborationCommand::BatchGetEncodeCollab { object_ids, ret })
.await
{
error!(
"Failed to send get encode collab command to realtime server: {}",
err
);
return HashMap::new();
}

// Await the response from the realtime server with a timeout
match timeout(timeout_duration, rx).await {
Ok(Ok(batch_encoded_collab)) => batch_encoded_collab,
Ok(Err(err)) => {
error!("Failed to get encode collab from realtime server: {}", err);
HashMap::new()
},
Err(_) => {
error!("Timeout waiting for encode collab from realtime server");
HashMap::new()
},
}
}

async fn queue_insert_collab(
&self,
workspace_id: &str,
Expand Down Expand Up @@ -326,6 +361,7 @@ where
&self,
_uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult> {
// Partition queries based on validation into valid queries and errors (with associated error messages).
let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) =
Expand All @@ -340,8 +376,48 @@ where
},
)),
});
let cache_queries = if from_editing_collab {
let editing_queries = valid_queries.clone();
let editing_results = self
.batch_get_encode_collab_from_editing(
editing_queries
.iter()
.map(|q| q.object_id.clone())
.collect(),
)
.await;
let editing_query_collab_results: HashMap<String, QueryCollabResult> =
tokio::task::spawn_blocking(move || {
let par_iter = editing_results.into_par_iter();
par_iter
.map(|(object_id, encoded_collab)| {
let encoding_result = encoded_collab.encode_to_bytes();
let query_collab_result = match encoding_result {
Ok(encoded_collab_bytes) => QueryCollabResult::Success {
encode_collab_v1: encoded_collab_bytes,
},
Err(err) => QueryCollabResult::Failed {
error: err.to_string(),
},
};

(object_id.clone(), query_collab_result)
})
.collect()
})
.await
.unwrap();
let editing_object_ids: Vec<String> = editing_query_collab_results.keys().cloned().collect();
results.extend(editing_query_collab_results);
valid_queries
.into_iter()
.filter(|q| !editing_object_ids.contains(&q.object_id))
.collect()
} else {
valid_queries
};

results.extend(self.cache.batch_get_encode_collab(valid_queries).await);
results.extend(self.cache.batch_get_encode_collab(cache_queries).await);
results
}

Expand Down
55 changes: 51 additions & 4 deletions services/appflowy-collaborate/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
use crate::{
error::RealtimeError,
group::cmd::{GroupCommand, GroupCommandSender},
group::{
cmd::{GroupCommand, GroupCommandSender},
manager::GroupManager,
},
};
use access_control::collab::RealtimeAccessControl;
use collab::entity::EncodedCollab;
use collab_rt_entity::ClientCollabMessage;
use dashmap::DashMap;
use std::sync::Arc;
use database::collab::CollabStorage;
use futures::StreamExt;
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use tracing::error;

pub type CLCommandSender = tokio::sync::mpsc::Sender<CollaborationCommand>;
pub type CLCommandReceiver = tokio::sync::mpsc::Receiver<CollaborationCommand>;

pub type EncodeCollabSender = tokio::sync::oneshot::Sender<Option<EncodedCollab>>;
pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender<HashMap<String, EncodedCollab>>;
pub enum CollaborationCommand {
GetEncodeCollab {
object_id: String,
ret: EncodeCollabSender,
},
BatchGetEncodeCollab {
object_ids: Vec<String>,
ret: BatchEncodeCollabSender,
},
ServerSendCollabMessage {
object_id: String,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
}

pub(crate) fn spawn_collaboration_command(
pub(crate) fn spawn_collaboration_command<S, AC>(
mut command_recv: CLCommandReceiver,
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
) {
weak_groups: Weak<GroupManager<S, AC>>,
) where
S: CollabStorage,
AC: RealtimeAccessControl,
{
let group_sender_by_object_id = group_sender_by_object_id.clone();
tokio::spawn(async move {
while let Some(cmd) = command_recv.recv().await {
Expand All @@ -50,6 +68,35 @@ pub(crate) fn spawn_collaboration_command(
},
}
},
CollaborationCommand::BatchGetEncodeCollab { object_ids, ret } => {
if let Some(group_manager) = weak_groups.upgrade() {
let tasks = futures::stream::iter(object_ids)
.map(|object_id| {
let cloned_group_manager = group_manager.clone();
tokio::task::spawn(async move {
let group = cloned_group_manager.get_group(&object_id).await;
if let Some(group) = group {
(object_id, group.encode_collab().await.ok())
} else {
(object_id, None)
}
})
})
.collect::<Vec<_>>()
.await;

let mut outputs: HashMap<String, EncodedCollab> = HashMap::new();
for task in tasks {
let result = task.await;
if let Ok((object_id, Some(encoded_collab))) = result {
outputs.insert(object_id, encoded_collab);
}
}
let _ = ret.send(outputs);
} else {
let _ = ret.send(HashMap::new());
}
},
CollaborationCommand::ServerSendCollabMessage {
object_id,
collab_messages,
Expand Down
6 changes: 5 additions & 1 deletion services/appflowy-collaborate/src/rt_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ where

spawn_period_check_inactive_group(Arc::downgrade(&group_manager), &group_sender_by_object_id);

spawn_collaboration_command(command_recv, &group_sender_by_object_id);
spawn_collaboration_command(
command_recv,
&group_sender_by_object_id,
Arc::downgrade(&group_manager),
);

spawn_metrics(metrics.clone(), storage.clone());

Expand Down
2 changes: 1 addition & 1 deletion src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ async fn batch_get_collab_handler(
let result = BatchQueryCollabResult(
state
.collab_access_control_storage
.batch_get_collab(&uid, payload.into_inner().0)
.batch_get_collab(&uid, payload.into_inner().0, false)
.await,
);
Ok(Json(AppResponse::Ok().with_data(result)))
Expand Down
2 changes: 1 addition & 1 deletion src/biz/workspace/page_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn get_page_collab_data_for_database(
})
.collect();
let row_query_collab_results = collab_access_control_storage
.batch_get_collab(&uid, queries)
.batch_get_collab(&uid, queries, true)
.await;
let row_data = tokio::task::spawn_blocking(move || {
let row_collabs: HashMap<String, Vec<u8>> = row_query_collab_results
Expand Down

0 comments on commit 1173232

Please sign in to comment.