diff --git a/.sqlx/query-26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b.json b/.sqlx/query-ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379.json similarity index 50% rename from .sqlx/query-26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b.json rename to .sqlx/query-ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379.json index d6bba3f11..9bc39f3e9 100644 --- a/.sqlx/query-26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b.json +++ b/.sqlx/query-ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1\n from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0)", + "query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1 from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0\n )\n ", "describe": { "columns": [ { @@ -28,5 +28,5 @@ false ] }, - "hash": "26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b" + "hash": "ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379" } diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 4063699ec..c1db97390 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -1,7 +1,9 @@ use collab_entity::CollabType; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; use pgvector::Vector; use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; -use sqlx::{Error, Executor, Postgres, Transaction}; +use sqlx::{Error, Executor, PgPool, Postgres, Transaction}; use std::ops::DerefMut; use uuid::Uuid; @@ -109,35 +111,29 @@ pub async fn upsert_collab_embeddings( Ok(()) } -pub async fn get_collabs_without_embeddings<'a, E>( - executor: E, -) -> Result, sqlx::Error> -where - E: Executor<'a, Database = Postgres>, -{ - let oids = sqlx::query!( +pub fn get_collabs_without_embeddings(pg_pool: &PgPool) -> BoxStream> { + // atm. get only documents + sqlx::query!( r#" - select c.workspace_id, c.oid, c.partition_key - from af_collab c - join af_workspace w on c.workspace_id = w.workspace_id - where not coalesce(w.settings['disable_search_indexding']::boolean, false) - and not exists ( - select 1 - from af_collab_embeddings em - where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents - ) - .fetch_all(executor) - .await?; - Ok( - oids - .into_iter() - .map(|r| CollabId { - collab_type: CollabType::from(r.partition_key), - workspace_id: r.workspace_id, - object_id: r.oid, - }) - .collect(), + select c.workspace_id, c.oid, c.partition_key + from af_collab c + join af_workspace w on c.workspace_id = w.workspace_id + where not coalesce(w.settings['disable_search_indexding']::boolean, false) + and not exists ( + select 1 from af_collab_embeddings em + where em.oid = c.oid and em.partition_key = 0 + ) + "# ) + .fetch(pg_pool) + .map(|row| { + row.map(|r| CollabId { + collab_type: CollabType::from(r.partition_key), + workspace_id: r.workspace_id, + object_id: r.oid, + }) + }) + .boxed() } #[derive(Debug, Clone)] diff --git a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs index f6aa53e26..efadc0152 100644 --- a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs +++ b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs @@ -4,11 +4,9 @@ use crate::indexer::vector::embedder::Embedder; use crate::indexer::vector::open_ai; use crate::indexer::{Indexer, IndexerProvider}; use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; -use actix::dev::Stream; use anyhow::anyhow; use app_error::AppError; use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse}; -use async_stream::try_stream; use bytes::Bytes; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; @@ -21,10 +19,10 @@ use database::collab::{CollabStorage, GetCollabOrigin}; use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings}; use database::workspace::select_workspace_settings; use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams}; +use futures_util::stream::BoxStream; use futures_util::StreamExt; use rayon::prelude::*; use sqlx::PgPool; -use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -403,7 +401,7 @@ async fn handle_unindexed_collabs(scheduler: Arc) { tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; let mut i = 0; - let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone()); + let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone()).await; let record_tx = scheduler.schedule_tx.clone(); let start = Instant::now(); while let Some(result) = stream.next().await { @@ -442,39 +440,43 @@ async fn handle_unindexed_collabs(scheduler: Arc) { ) } -fn get_unindexed_collabs( +pub async fn get_unindexed_collabs( pg_pool: &PgPool, storage: Arc, -) -> Pin> + Send>> { - let db = pg_pool.clone(); - Box::pin(try_stream! { - let collabs = get_collabs_without_embeddings(&db).await?; - if !collabs.is_empty() { - info!("found {} unindexed collabs", collabs.len()); - } - for cid in collabs { - match &cid.collab_type { - CollabType::Document => { - let collab = storage - .get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false) - .await?; - - yield UnindexedCollab { - workspace_id: cid.workspace_id, - object_id: cid.object_id, - collab_type: cid.collab_type, - collab, - }; - }, - CollabType::Database - | CollabType::WorkspaceDatabase - | CollabType::Folder - | CollabType::DatabaseRow - | CollabType::UserAwareness - | CollabType::Unknown => { /* atm. only document types are supported */ }, +) -> BoxStream> { + let cloned_storage = storage.clone(); + get_collabs_without_embeddings(pg_pool) + .map(move |result| { + let storage = cloned_storage.clone(); + async move { + match result { + Ok(cid) => match cid.collab_type { + CollabType::Document => { + let collab = storage + .get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false) + .await?; + + Ok(Some(UnindexedCollab { + workspace_id: cid.workspace_id, + object_id: cid.object_id, + collab_type: cid.collab_type, + collab, + })) + }, + _ => Ok::<_, anyhow::Error>(None), + }, + Err(e) => Err(e.into()), + } } - } - }) + }) + .filter_map(|future| async { + match future.await { + Ok(Some(unindexed_collab)) => Some(Ok(unindexed_collab)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }) + .boxed() } async fn index_unindexd_collab(