Skip to content

Commit

Permalink
chore: return unindex collab one by one (#1095)
Browse files Browse the repository at this point in the history
* chore: return unindex collab one by one

* chore: clippy
  • Loading branch information
appflowy authored Dec 20, 2024
1 parent a68dde0 commit 46f9c78
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 64 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 24 additions & 28 deletions libs/database/src/index/collab_embeddings_ops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use collab_entity::CollabType;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use pgvector::Vector;
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use sqlx::{Error, Executor, Postgres, Transaction};
use sqlx::{Error, Executor, PgPool, Postgres, Transaction};
use std::ops::DerefMut;
use uuid::Uuid;

Expand Down Expand Up @@ -109,35 +111,29 @@ pub async fn upsert_collab_embeddings(
Ok(())
}

pub async fn get_collabs_without_embeddings<'a, E>(
executor: E,
) -> Result<Vec<CollabId>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
let oids = sqlx::query!(
pub fn get_collabs_without_embeddings(pg_pool: &PgPool) -> BoxStream<sqlx::Result<CollabId>> {
// atm. get only documents
sqlx::query!(
r#"
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1
from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents
)
.fetch_all(executor)
.await?;
Ok(
oids
.into_iter()
.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
.collect(),
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1 from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0
)
"#
)
.fetch(pg_pool)
.map(|row| {
row.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
})
.boxed()
}

#[derive(Debug, Clone)]
Expand Down
70 changes: 36 additions & 34 deletions services/appflowy-collaborate/src/indexer/indexer_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
use app_error::AppError;
use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse};
use async_stream::try_stream;
use bytes::Bytes;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
Expand All @@ -21,10 +19,10 @@ use database::collab::{CollabStorage, GetCollabOrigin};
use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams};
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use rayon::prelude::*;
use sqlx::PgPool;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -403,7 +401,7 @@ async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;

let mut i = 0;
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone());
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone()).await;
let record_tx = scheduler.schedule_tx.clone();
let start = Instant::now();
while let Some(result) = stream.next().await {
Expand Down Expand Up @@ -442,39 +440,43 @@ async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
)
}

fn get_unindexed_collabs(
pub async fn get_unindexed_collabs(
pg_pool: &PgPool,
storage: Arc<dyn CollabStorage>,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = pg_pool.clone();
Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;
if !collabs.is_empty() {
info!("found {} unindexed collabs", collabs.len());
}
for cid in collabs {
match &cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;

yield UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
};
},
CollabType::Database
| CollabType::WorkspaceDatabase
| CollabType::Folder
| CollabType::DatabaseRow
| CollabType::UserAwareness
| CollabType::Unknown => { /* atm. only document types are supported */ },
) -> BoxStream<Result<UnindexedCollab, anyhow::Error>> {
let cloned_storage = storage.clone();
get_collabs_without_embeddings(pg_pool)
.map(move |result| {
let storage = cloned_storage.clone();
async move {
match result {
Ok(cid) => match cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;

Ok(Some(UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
}))
},
_ => Ok::<_, anyhow::Error>(None),
},
Err(e) => Err(e.into()),
}
}
}
})
})
.filter_map(|future| async {
match future.await {
Ok(Some(unindexed_collab)) => Some(Ok(unindexed_collab)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.boxed()
}

async fn index_unindexd_collab(
Expand Down

0 comments on commit 46f9c78

Please sign in to comment.