Skip to content

Commit

Permalink
feat: support fetching encoded collab in batch from memory
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng committed Sep 24, 2024
1 parent e499c4c commit 6f02b5c
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 9 deletions.
7 changes: 6 additions & 1 deletion libs/database/src/collab/collab_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub trait CollabStorage: Send + Sync + 'static {
&self,
uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult>;

/// Deletes a collaboration from the storage.
Expand Down Expand Up @@ -249,8 +250,12 @@ where
&self,
uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult> {
self.as_ref().batch_get_collab(uid, queries).await
self
.as_ref()
.batch_get_collab(uid, queries, from_editing_collab)
.await
}

async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()> {
Expand Down
70 changes: 69 additions & 1 deletion services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,40 @@ where
}
}

async fn batch_get_encode_collab_from_editing(
&self,
object_ids: Vec<String>,
) -> HashMap<String, EncodedCollab> {
let (ret, rx) = tokio::sync::oneshot::channel();
let timeout_duration = Duration::from_secs(10);

// Attempt to send the command to the realtime server
if let Err(err) = self
.rt_cmd_sender
.send(CollaborationCommand::BatchGetEncodeCollab { object_ids, ret })
.await
{
error!(
"Failed to send get encode collab command to realtime server: {}",
err
);
return HashMap::new();
}

// Await the response from the realtime server with a timeout
match timeout(timeout_duration, rx).await {
Ok(Ok(batch_encoded_collab)) => batch_encoded_collab,
Ok(Err(err)) => {
error!("Failed to get encode collab from realtime server: {}", err);
HashMap::new()
},
Err(_) => {
error!("Timeout waiting for encode collab from realtime server");
HashMap::new()
},
}
}

async fn queue_insert_collab(
&self,
workspace_id: &str,
Expand Down Expand Up @@ -326,6 +360,7 @@ where
&self,
_uid: &i64,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult> {
// Partition queries based on validation into valid queries and errors (with associated error messages).
let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) =
Expand All @@ -340,8 +375,41 @@ where
},
)),
});
let cache_queries = if from_editing_collab {
let editing_queries = valid_queries.clone();
let editing_results = self
.batch_get_encode_collab_from_editing(
editing_queries
.iter()
.map(|q| q.object_id.clone())
.collect(),
)
.await;
let editing_query_collab_results: HashMap<String, QueryCollabResult> =
tokio::task::spawn_blocking(move || {
editing_results
.into_iter()
.map(|(object_id, encoded_collab)| {
let query_collab_result = QueryCollabResult::Success {
encode_collab_v1: encoded_collab.encode_to_bytes().unwrap(),
};
(object_id.clone(), query_collab_result)
})
.collect()
})
.await
.unwrap();
let editing_object_ids: Vec<String> = editing_query_collab_results.keys().cloned().collect();
results.extend(editing_query_collab_results);
valid_queries
.into_iter()
.filter(|q| !editing_object_ids.contains(&q.object_id))
.collect()
} else {
valid_queries
};

results.extend(self.cache.batch_get_encode_collab(valid_queries).await);
results.extend(self.cache.batch_get_encode_collab(cache_queries).await);
results
}

Expand Down
55 changes: 51 additions & 4 deletions services/appflowy-collaborate/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
use crate::{
error::RealtimeError,
group::cmd::{GroupCommand, GroupCommandSender},
group::{
cmd::{GroupCommand, GroupCommandSender},
manager::GroupManager,
},
};
use access_control::collab::RealtimeAccessControl;
use collab::entity::EncodedCollab;
use collab_rt_entity::ClientCollabMessage;
use dashmap::DashMap;
use std::sync::Arc;
use database::collab::CollabStorage;
use futures::StreamExt;
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use tracing::error;

pub type CLCommandSender = tokio::sync::mpsc::Sender<CollaborationCommand>;
pub type CLCommandReceiver = tokio::sync::mpsc::Receiver<CollaborationCommand>;

pub type EncodeCollabSender = tokio::sync::oneshot::Sender<Option<EncodedCollab>>;
pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender<HashMap<String, EncodedCollab>>;
pub enum CollaborationCommand {
GetEncodeCollab {
object_id: String,
ret: EncodeCollabSender,
},
BatchGetEncodeCollab {
object_ids: Vec<String>,
ret: BatchEncodeCollabSender,
},
ServerSendCollabMessage {
object_id: String,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
}

pub(crate) fn spawn_collaboration_command(
pub(crate) fn spawn_collaboration_command<S, AC>(
mut command_recv: CLCommandReceiver,
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
) {
weak_groups: Weak<GroupManager<S, AC>>,
) where
S: CollabStorage,
AC: RealtimeAccessControl,
{
let group_sender_by_object_id = group_sender_by_object_id.clone();
tokio::spawn(async move {
while let Some(cmd) = command_recv.recv().await {
Expand All @@ -50,6 +68,35 @@ pub(crate) fn spawn_collaboration_command(
},
}
},
CollaborationCommand::BatchGetEncodeCollab { object_ids, ret } => {
if let Some(group_manager) = weak_groups.upgrade() {
let tasks = futures::stream::iter(object_ids)
.map(|object_id| {
let cloned_group_manager = group_manager.clone();
tokio::task::spawn(async move {
let group = cloned_group_manager.get_group(&object_id).await;
if let Some(group) = group {
(object_id, group.encode_collab().await.ok())
} else {
(object_id, None)
}
})
})
.collect::<Vec<_>>()
.await;

let mut outputs: HashMap<String, EncodedCollab> = HashMap::new();
for task in tasks {
let result = task.await;
if let Ok((object_id, Some(encoded_collab))) = result {
outputs.insert(object_id, encoded_collab);
}
}
let _ = ret.send(outputs);
} else {
let _ = ret.send(HashMap::new());
}
},
CollaborationCommand::ServerSendCollabMessage {
object_id,
collab_messages,
Expand Down
6 changes: 5 additions & 1 deletion services/appflowy-collaborate/src/rt_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ where

spawn_period_check_inactive_group(Arc::downgrade(&group_manager), &group_sender_by_object_id);

spawn_collaboration_command(command_recv, &group_sender_by_object_id);
spawn_collaboration_command(
command_recv,
&group_sender_by_object_id,
Arc::downgrade(&group_manager),
);

spawn_metrics(metrics.clone(), storage.clone());

Expand Down
2 changes: 1 addition & 1 deletion src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ async fn batch_get_collab_handler(
let result = BatchQueryCollabResult(
state
.collab_access_control_storage
.batch_get_collab(&uid, payload.into_inner().0)
.batch_get_collab(&uid, payload.into_inner().0, false)
.await,
);
Ok(Json(AppResponse::Ok().with_data(result)))
Expand Down
2 changes: 1 addition & 1 deletion src/biz/workspace/page_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn get_page_collab_data_for_database(
})
.collect();
let row_query_collab_results = collab_access_control_storage
.batch_get_collab(&uid, queries)
.batch_get_collab(&uid, queries, true)
.await;
let row_data = tokio::task::spawn_blocking(move || {
let row_collabs: HashMap<String, Vec<u8>> = row_query_collab_results
Expand Down

0 comments on commit 6f02b5c

Please sign in to comment.