From 27a2837e14947b923a84107d87cb239e98a95c8b Mon Sep 17 00:00:00 2001 From: nathan Date: Tue, 24 Dec 2024 00:03:13 +0800 Subject: [PATCH] chore: background index --- Cargo.lock | 3 + deploy.env | 2 +- dev.env | 2 +- libs/app-error/src/lib.rs | 1 + .../collab-rt-protocol/src/data_validation.rs | 23 +-- .../src/collab_indexer/document_indexer.rs | 15 +- libs/indexer/src/collab_indexer/provider.rs | 9 +- libs/indexer/src/lib.rs | 4 +- libs/indexer/src/queue.rs | 36 +--- libs/indexer/src/scheduler.rs | 180 +++++++++-------- libs/indexer/src/unindexed_workspace.rs | 2 +- .../appflowy-collaborate/src/application.rs | 6 - .../src/collab/validator.rs | 12 +- .../src/group/persistence.rs | 38 ++-- services/appflowy-worker/Cargo.toml | 7 +- services/appflowy-worker/src/application.rs | 25 +++ services/appflowy-worker/src/indexer/mod.rs | 3 - .../appflowy-worker/src/indexer/worker.rs | 55 ------ .../appflowy-worker/src/indexer_worker/mod.rs | 2 + .../src/indexer_worker/worker.rs | 182 ++++++++++++++++++ services/appflowy-worker/src/lib.rs | 2 +- src/api/util.rs | 12 +- src/api/workspace.rs | 150 ++++++++++++--- src/application.rs | 10 +- tests/collab/collab_curd_test.rs | 13 +- tests/sql_test/mod.rs | 1 - tests/sql_test/upsert_embedding_test.rs | 14 -- 27 files changed, 508 insertions(+), 301 deletions(-) delete mode 100644 services/appflowy-worker/src/indexer/mod.rs delete mode 100644 services/appflowy-worker/src/indexer/worker.rs create mode 100644 services/appflowy-worker/src/indexer_worker/mod.rs create mode 100644 services/appflowy-worker/src/indexer_worker/worker.rs delete mode 100644 tests/sql_test/upsert_embedding_test.rs diff --git a/Cargo.lock b/Cargo.lock index 9e0eb1062..15305ed3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,6 +777,8 @@ name = "appflowy-worker" version = "0.1.0" dependencies = [ "anyhow", + "app-error", + "appflowy-collaborate", "async_zip", "aws-config", "aws-sdk-s3", @@ -798,6 +800,7 @@ dependencies = [ "md5", "mime_guess", "prometheus-client", + "rayon", "redis 0.25.4", "reqwest", "secrecy", diff --git a/deploy.env b/deploy.env index da6b5bc73..59de9edd2 100644 --- a/deploy.env +++ b/deploy.env @@ -157,7 +157,7 @@ APPFLOWY_LOCAL_AI_TEST_ENABLED=false APPFLOWY_INDEXER_ENABLED=true APPFLOWY_INDEXER_DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} APPFLOWY_INDEXER_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT} -APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=2000 +APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=5000 # AppFlowy Collaborate APPFLOWY_COLLABORATE_MULTI_THREAD=false diff --git a/dev.env b/dev.env index ce62f8a8b..34737c958 100644 --- a/dev.env +++ b/dev.env @@ -124,7 +124,7 @@ APPFLOWY_LOCAL_AI_TEST_ENABLED=false APPFLOWY_INDEXER_ENABLED=true APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 -APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=2000 +APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=5000 # AppFlowy Collaborate APPFLOWY_COLLABORATE_MULTI_THREAD=false diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index 4c67a4747..d0c2613e9 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -320,6 +320,7 @@ impl From for AppError { sqlx::Error::RowNotFound => { AppError::RecordNotFound(format!("Record not exist in db. {})", msg)) }, + sqlx::Error::PoolTimedOut => AppError::ActionTimeout(value.to_string()), _ => AppError::SqlxError(msg), } } diff --git a/libs/collab-rt-protocol/src/data_validation.rs b/libs/collab-rt-protocol/src/data_validation.rs index b84f6fd60..365fc724d 100644 --- a/libs/collab-rt-protocol/src/data_validation.rs +++ b/libs/collab-rt-protocol/src/data_validation.rs @@ -6,14 +6,8 @@ use collab::preclude::Collab; use collab_entity::CollabType; use tracing::instrument; -#[instrument(level = "trace", skip(data), fields(len = %data.len()))] #[inline] -pub async fn spawn_blocking_validate_encode_collab( - object_id: &str, - data: &[u8], - collab_type: &CollabType, -) -> Result<(), Error> { - let collab_type = collab_type.clone(); +pub async fn collab_from_encode_collab(object_id: &str, data: &[u8]) -> Result { let object_id = object_id.to_string(); let data = data.to_vec(); @@ -27,28 +21,19 @@ pub async fn spawn_blocking_validate_encode_collab( false, )?; - collab_type.validate_require_data(&collab)?; - Ok::<(), Error>(()) + Ok::<_, Error>(collab) }) .await? } #[instrument(level = "trace", skip(data), fields(len = %data.len()))] #[inline] -pub fn validate_encode_collab( +pub async fn validate_encode_collab( object_id: &str, data: &[u8], collab_type: &CollabType, ) -> Result<(), Error> { - let encoded_collab = EncodedCollab::decode_from_bytes(data)?; - let collab = Collab::new_with_source( - CollabOrigin::Empty, - object_id, - DataSource::DocStateV1(encoded_collab.doc_state.to_vec()), - vec![], - false, - )?; - + let collab = collab_from_encode_collab(object_id, data).await?; collab_type.validate_require_data(&collab)?; Ok::<(), Error>(()) } diff --git a/libs/indexer/src/collab_indexer/document_indexer.rs b/libs/indexer/src/collab_indexer/document_indexer.rs index 093bf58d6..b0409864b 100644 --- a/libs/indexer/src/collab_indexer/document_indexer.rs +++ b/libs/indexer/src/collab_indexer/document_indexer.rs @@ -20,7 +20,7 @@ pub struct DocumentIndexer; #[async_trait] impl Indexer for DocumentIndexer { - fn create_embedded_chunks( + fn create_embedded_chunks_from_collab( &self, collab: &Collab, embedding_model: EmbeddingModel, @@ -35,9 +35,7 @@ impl Indexer for DocumentIndexer { let result = document.to_plain_text(collab.transact(), false, true); match result { - Ok(content) => { - split_text_into_chunks(object_id, content, CollabType::Document, &embedding_model) - }, + Ok(content) => self.create_embedded_chunks_from_text(object_id, content, embedding_model), Err(err) => { if matches!(err, DocumentError::NoRequiredData) { Ok(vec![]) @@ -48,6 +46,15 @@ impl Indexer for DocumentIndexer { } } + fn create_embedded_chunks_from_text( + &self, + object_id: String, + text: String, + model: EmbeddingModel, + ) -> Result, AppError> { + split_text_into_chunks(object_id, text, CollabType::Document, &model) + } + fn embed( &self, embedder: &Embedder, diff --git a/libs/indexer/src/collab_indexer/provider.rs b/libs/indexer/src/collab_indexer/provider.rs index edf684b06..6968a234a 100644 --- a/libs/indexer/src/collab_indexer/provider.rs +++ b/libs/indexer/src/collab_indexer/provider.rs @@ -11,12 +11,19 @@ use std::sync::Arc; use tracing::info; pub trait Indexer: Send + Sync { - fn create_embedded_chunks( + fn create_embedded_chunks_from_collab( &self, collab: &Collab, model: EmbeddingModel, ) -> Result, AppError>; + fn create_embedded_chunks_from_text( + &self, + object_id: String, + text: String, + model: EmbeddingModel, + ) -> Result, AppError>; + fn embed( &self, embedder: &Embedder, diff --git a/libs/indexer/src/lib.rs b/libs/indexer/src/lib.rs index abf3b797c..21b19bc5d 100644 --- a/libs/indexer/src/lib.rs +++ b/libs/indexer/src/lib.rs @@ -1,9 +1,9 @@ pub mod collab_indexer; -mod entity; +pub mod entity; mod error; pub mod metrics; pub mod queue; pub mod scheduler; pub mod thread_pool; mod unindexed_workspace; -mod vector; +pub mod vector; diff --git a/libs/indexer/src/queue.rs b/libs/indexer/src/queue.rs index 68a102a90..5595a03a0 100644 --- a/libs/indexer/src/queue.rs +++ b/libs/indexer/src/queue.rs @@ -1,12 +1,10 @@ use crate::error::IndexerError; -use crate::scheduler::PendingUnindexedCollab; +use crate::scheduler::UnindexedCollabTask; use anyhow::anyhow; use app_error::AppError; -use collab_entity::CollabType; use redis::aio::ConnectionManager; use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; use redis::{AsyncCommands, RedisResult, Value}; -use serde::{Deserialize, Serialize}; use serde_json::from_str; use tracing::error; @@ -14,27 +12,7 @@ pub const INDEX_TASK_STREAM_NAME: &str = "index_collab_task_stream"; const INDEXER_WORKER_GROUP_NAME: &str = "indexer_worker_group"; const INDEXER_CONSUMER_NAME: &str = "appflowy_worker"; -#[derive(Debug, Serialize, Deserialize)] -pub enum EmbedderTask { - CollabId { - object_id: String, - collab_type: CollabType, - /// Timestamp that indicates when the task was created. - created_at: i64, - }, -} - -impl From for EmbedderTask { - fn from(pending_collab: PendingUnindexedCollab) -> Self { - EmbedderTask::CollabId { - object_id: pending_collab.object_id, - collab_type: pending_collab.collab_type, - created_at: chrono::Utc::now().timestamp(), - } - } -} - -impl TryFrom<&StreamId> for EmbedderTask { +impl TryFrom<&StreamId> for UnindexedCollabTask { type Error = IndexerError; fn try_from(stream_id: &StreamId) -> Result { @@ -57,7 +35,7 @@ impl TryFrom<&StreamId> for EmbedderTask { }, }; - from_str::(&task_str).map_err(|err| IndexerError::Internal(err.into())) + from_str::(&task_str).map_err(|err| IndexerError::Internal(err.into())) } } @@ -68,7 +46,7 @@ impl TryFrom<&StreamId> for EmbedderTask { /// pub async fn add_background_embed_task( redis_client: ConnectionManager, - tasks: Vec, + tasks: Vec, ) -> Result<(), AppError> { let items = tasks .into_iter() @@ -132,14 +110,14 @@ pub async fn read_background_embed_tasks( /// If `false`, the task remains in the stream after acknowledgment. pub async fn ack_task( redis_client: &mut ConnectionManager, - stream_entity_id: &str, + stream_entity_ids: Vec, delete_task: bool, ) -> Result<(), IndexerError> { let _: () = redis_client .xack( INDEX_TASK_STREAM_NAME, INDEXER_WORKER_GROUP_NAME, - &[stream_entity_id], + &stream_entity_ids, ) .await .map_err(|err| { @@ -149,7 +127,7 @@ pub async fn ack_task( if delete_task { let _: () = redis_client - .xdel(INDEX_TASK_STREAM_NAME, &[stream_entity_id]) + .xdel(INDEX_TASK_STREAM_NAME, &stream_entity_ids) .await .map_err(|err| { error!("Failed to delete task: {:?}", err); diff --git a/libs/indexer/src/scheduler.rs b/libs/indexer/src/scheduler.rs index a76289d8b..807a0c73a 100644 --- a/libs/indexer/src/scheduler.rs +++ b/libs/indexer/src/scheduler.rs @@ -6,23 +6,20 @@ use crate::queue::add_background_embed_task; use crate::thread_pool::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::vector::embedder::Embedder; use crate::vector::open_ai; -use anyhow::anyhow; use app_error::AppError; use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse}; -use bytes::Bytes; -use collab::core::collab::DataSource; -use collab::core::origin::CollabOrigin; -use collab::entity::EncodedCollab; use collab::lock::RwLock; use collab::preclude::Collab; +use collab_document::document::DocumentBody; use collab_entity::CollabType; use database::collab::CollabStorage; use database::index::upsert_collab_embeddings; use database::workspace::select_workspace_settings; -use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams}; +use database_entity::dto::AFCollabEmbeddedChunk; use infra::env_util::get_env_var; use rayon::prelude::*; use redis::aio::ConnectionManager; +use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::cmp::max; use std::collections::HashSet; @@ -31,6 +28,7 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::RwLock as TokioRwLock; use tokio::time::timeout; use tracing::{debug, error, info, instrument, trace, warn}; use uuid::Uuid; @@ -43,7 +41,7 @@ pub struct IndexerScheduler { #[allow(dead_code)] pub(crate) metrics: Arc, write_embedding_tx: UnboundedSender, - gen_embedding_tx: mpsc::Sender, + gen_embedding_tx: mpsc::Sender, config: IndexerConfiguration, redis_client: ConnectionManager, } @@ -52,7 +50,6 @@ pub struct IndexerScheduler { pub struct IndexerConfiguration { pub enable: bool, pub openai_api_key: String, - pub enable_background_indexing: bool, /// High watermark for the number of embeddings that can be buffered before being written to the database. pub embedding_buffer_size: usize, } @@ -63,7 +60,7 @@ impl IndexerScheduler { pg_pool: PgPool, storage: Arc, metrics: Arc, - mut config: IndexerConfiguration, + config: IndexerConfiguration, redis_client: ConnectionManager, ) -> Arc { // Since threads often block while waiting for I/O, you can use more threads than CPU cores to improve concurrency. @@ -75,22 +72,14 @@ impl IndexerScheduler { 5, ); - if num_thread > config.embedding_buffer_size { - warn!( - "Number of threads {} is greater than embedding_buffer_size {}, set to {}", - num_thread, config.embedding_buffer_size, num_thread - ); - config.embedding_buffer_size = num_thread; - } - info!("Indexer scheduler config: {:?}", config); let (write_embedding_tx, write_embedding_rx) = unbounded_channel::(); let (gen_embedding_tx, gen_embedding_rx) = - mpsc::channel::(config.embedding_buffer_size); + mpsc::channel::(config.embedding_buffer_size); let threads = Arc::new( ThreadPoolNoAbortBuilder::new() .num_threads(num_thread) - .thread_name(|index| format!("embedding-request-{index}")) + .thread_name(|index| format!("create-embedding-thread-{index}")) .build() .unwrap(), ); @@ -113,17 +102,20 @@ impl IndexerScheduler { num_thread ); + let latest_write_embedding_err = Arc::new(TokioRwLock::new(None)); if this.index_enabled() { tokio::spawn(spawn_rayon_generate_embeddings( gen_embedding_rx, Arc::downgrade(&this), num_thread, + latest_write_embedding_err.clone(), )); tokio::spawn(spawn_pg_write_embeddings( write_embedding_rx, this.pg_pool.clone(), this.metrics.clone(), + latest_write_embedding_err, )); } @@ -169,27 +161,20 @@ impl IndexerScheduler { Ok(embeddings) } - pub fn embed_in_background( - &self, - pending_collab: PendingUnindexedCollab, - ) -> Result<(), AppError> { + pub fn embed_in_background(&self, pending_collab: UnindexedCollabTask) -> Result<(), AppError> { if !self.index_enabled() { return Ok(()); } - if !self.config.enable_background_indexing { - return Ok(()); - } - let redis_client = self.redis_client.clone(); tokio::spawn(add_background_embed_task( redis_client, - vec![pending_collab.into()], + vec![pending_collab], )); Ok(()) } - pub fn embed_immediately(&self, pending_collab: PendingUnindexedCollab) -> Result<(), AppError> { + pub fn embed_immediately(&self, pending_collab: UnindexedCollabTask) -> Result<(), AppError> { if !self.index_enabled() { return Ok(()); } @@ -210,8 +195,8 @@ impl IndexerScheduler { pub fn index_pending_collab_one( &self, - pending_collab: PendingUnindexedCollab, - _background: bool, + pending_collab: UnindexedCollabTask, + background: bool, ) -> Result<(), AppError> { if !self.index_enabled() { return Ok(()); @@ -224,14 +209,17 @@ impl IndexerScheduler { return Ok(()); } - let _ = self.embed_immediately(pending_collab); + if background { + let _ = self.embed_in_background(pending_collab); + } else { + let _ = self.embed_immediately(pending_collab); + } Ok(()) } pub fn index_pending_collabs( &self, - mut pending_collabs: Vec, - _background: bool, + mut pending_collabs: Vec, ) -> Result<(), AppError> { if !self.index_enabled() { return Ok(()); @@ -265,32 +253,31 @@ impl IndexerScheduler { return Ok(()); } - let indexer = self - .indexer_provider - .indexer_for(collab_type) - .ok_or_else(|| { - AppError::Internal(anyhow!( - "No indexer found for collab type {:?}", - collab_type - )) - })?; - let embedder = self.create_embedder()?; - - let lock = collab.read().await; - let chunks = indexer.create_embedded_chunks(&lock, embedder.model())?; - drop(lock); // release the read lock ASAP - - if chunks.is_empty() { - return Ok(()); + match collab_type { + CollabType::Document => { + let lock = collab.read().await; + let txn = lock.transact(); + let text = DocumentBody::from_collab(&lock) + .and_then(|body| body.to_plain_text(txn, false, true).ok()); + drop(lock); // release the read lock ASAP + + if let Some(text) = text { + if !text.is_empty() { + let pending = UnindexedCollabTask::new( + Uuid::parse_str(workspace_id)?, + object_id.to_string(), + collab_type.clone(), + UnindexedData::UnindexedText(text), + ); + self.embed_immediately(pending)?; + } + } + }, + _ => { + // TODO(nathan): support other collab types + }, } - self.embed_immediately(PendingUnindexedCollab { - workspace_id: Uuid::parse_str(workspace_id)?, - object_id: object_id.to_string(), - collab_type: collab_type.clone(), - data: UnindexedData::UnindexedChunks(chunks), - })?; - Ok(()) } @@ -309,12 +296,23 @@ impl IndexerScheduler { } async fn spawn_rayon_generate_embeddings( - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver, scheduler: Weak, buffer_size: usize, + latest_write_embedding_err: Arc>>, ) { let mut buf = Vec::with_capacity(buffer_size); loop { + let latest_error = latest_write_embedding_err.write().await.take(); + if let Some(err) = latest_error { + if matches!(err, AppError::ActionTimeout(_)) { + info!( + "[Embedding] last write embedding task failed with timeout, waiting for 30s before retrying..." + ); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + let n = rx.recv_many(&mut buf, buffer_size).await; let scheduler = match scheduler.upgrade() { Some(scheduler) => scheduler, @@ -395,10 +393,11 @@ async fn spawn_rayon_generate_embeddings( } const EMBEDDING_RECORD_BUFFER_SIZE: usize = 10; -async fn spawn_pg_write_embeddings( +pub async fn spawn_pg_write_embeddings( mut rx: UnboundedReceiver, pg_pool: PgPool, metrics: Arc, + latest_write_embedding_error: Arc>>, ) { let mut buf = Vec::with_capacity(EMBEDDING_RECORD_BUFFER_SIZE); loop { @@ -424,15 +423,18 @@ async fn spawn_pg_write_embeddings( ) .await .unwrap_or_else(|_| { - Err(AppError::Internal(anyhow!( - "timeout when writing embeddings" - ))) + Err(AppError::ActionTimeout( + "timeout when writing embeddings".to_string(), + )) }); metrics.record_write_embedding_time(start.elapsed().as_millis()); match result { Ok(_) => trace!("[Embedding] save {} embeddings to disk", n), - Err(err) => error!("Failed to write collab embedding to disk:{}", err), + Err(err) => { + error!("Failed to write collab embedding to disk:{}", err); + latest_write_embedding_error.write().await.replace(err); + }, } } } @@ -469,7 +471,7 @@ pub(crate) async fn batch_insert_records( /// This function must be called within the rayon thread pool. fn process_collab( - embdder: &Embedder, + embedder: &Embedder, indexer: Option>, object_id: &str, data: UnindexedData, @@ -480,26 +482,14 @@ fn process_collab( let start_time = Instant::now(); let chunks = match data { - UnindexedData::UnindexedEncodeCollab(encoded_collab) => { - let encode_collab = EncodedCollab::decode_from_bytes(&encoded_collab) - .map_err(|err| AppError::Internal(anyhow!("Failed to decode encoded collab: {}", err)))?; - let collab = Collab::new_with_source( - CollabOrigin::Empty, - object_id, - DataSource::DocStateV1(encode_collab.doc_state.into()), - vec![], - false, - ) - .map_err(|err| AppError::Internal(err.into()))?; - indexer.create_embedded_chunks(&collab, embdder.model())? + UnindexedData::UnindexedText(text) => { + indexer.create_embedded_chunks_from_text(object_id.to_string(), text, embedder.model())? }, - UnindexedData::UnindexedChunks(chunks) => chunks, }; - let result = indexer.embed(embdder, chunks); + let result = indexer.embed(embedder, chunks); let duration = start_time.elapsed(); metrics.record_generate_embedding_time(duration.as_millis()); - match result { Ok(Some(embeddings)) => Ok(Some((embeddings.tokens_consumed, embeddings.params))), Ok(None) => Ok(None), @@ -513,25 +503,33 @@ fn process_collab( } } -pub struct PendingUnindexedCollab { +#[derive(Debug, Serialize, Deserialize)] +pub struct UnindexedCollabTask { pub workspace_id: Uuid, pub object_id: String, pub collab_type: CollabType, pub data: UnindexedData, + pub created_at: i64, } -pub enum UnindexedData { - UnindexedEncodeCollab(Bytes), - UnindexedChunks(Vec), -} - -impl From<(Uuid, &CollabParams)> for PendingUnindexedCollab { - fn from((workspace_id, params): (Uuid, &CollabParams)) -> Self { +impl UnindexedCollabTask { + pub fn new( + workspace_id: Uuid, + object_id: String, + collab_type: CollabType, + data: UnindexedData, + ) -> Self { Self { workspace_id, - object_id: params.object_id.clone(), - collab_type: params.collab_type.clone(), - data: UnindexedData::UnindexedEncodeCollab(params.encoded_collab_v1.clone()), + object_id, + collab_type, + data, + created_at: chrono::Utc::now().timestamp(), } } } + +#[derive(Debug, Serialize, Deserialize)] +pub enum UnindexedData { + UnindexedText(String), +} diff --git a/libs/indexer/src/unindexed_workspace.rs b/libs/indexer/src/unindexed_workspace.rs index 24ec01ebe..c2d0752a1 100644 --- a/libs/indexer/src/unindexed_workspace.rs +++ b/libs/indexer/src/unindexed_workspace.rs @@ -181,7 +181,7 @@ async fn create_embeddings( .ok()?; let chunks = indexer - .create_embedded_chunks(&collab, embedder.model()) + .create_embedded_chunks_from_collab(&collab, embedder.model()) .ok()?; if chunks.is_empty() { trace!("[Embedding] {} has no embeddings", unindexed.object_id,); diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index d7ff5bd7d..4875155e1 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -155,12 +155,6 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result() .unwrap_or(true), openai_api_key: get_env_var("APPFLOWY_AI_OPENAI_API_KEY", ""), - enable_background_indexing: get_env_var( - "APPFLOWY_INDEXER_BACKGROUND_INDEXING_ENABLED", - "false", - ) - .parse::() - .unwrap_or(false), embedding_buffer_size: get_env_var("APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE", "2000") .parse::() .unwrap_or(2000), diff --git a/services/appflowy-collaborate/src/collab/validator.rs b/services/appflowy-collaborate/src/collab/validator.rs index 6a84dee68..ff8f32b19 100644 --- a/services/appflowy-collaborate/src/collab/validator.rs +++ b/services/appflowy-collaborate/src/collab/validator.rs @@ -1,6 +1,6 @@ use app_error::AppError; use async_trait::async_trait; -use collab_rt_protocol::spawn_blocking_validate_encode_collab; +use collab_rt_protocol::validate_encode_collab; use database_entity::dto::CollabParams; #[async_trait] @@ -11,12 +11,8 @@ pub trait CollabValidator { #[async_trait] impl CollabValidator for CollabParams { async fn check_encode_collab(&self) -> Result<(), AppError> { - spawn_blocking_validate_encode_collab( - &self.object_id, - &self.encoded_collab_v1, - &self.collab_type, - ) - .await - .map_err(|err| AppError::NoRequiredData(err.to_string())) + validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type) + .await + .map_err(|err| AppError::NoRequiredData(err.to_string())) } } diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index 353335583..2bce0308c 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -6,10 +6,11 @@ use anyhow::anyhow; use app_error::AppError; use collab::lock::RwLock; use collab::preclude::Collab; +use collab_document::document::DocumentBody; use collab_entity::{validate_data_for_folder, CollabType}; use database::collab::CollabStorage; use database_entity::dto::CollabParams; -use indexer::scheduler::{IndexerScheduler, PendingUnindexedCollab, UnindexedData}; +use indexer::scheduler::{IndexerScheduler, UnindexedCollabTask, UnindexedData}; use tokio::time::interval; use tokio_util::sync::CancellationToken; use tracing::{trace, warn}; @@ -123,23 +124,38 @@ where let collab_type = self.collab_type.clone(); let cloned_collab = self.collab.clone(); + let indexer_scheduler = self.indexer_scheduler.clone(); + let params = tokio::task::spawn_blocking(move || { let collab = cloned_collab.blocking_read(); let params = get_encode_collab(&workspace_id, &object_id, &collab, &collab_type)?; + match collab_type { + CollabType::Document => { + let txn = collab.transact(); + let text = DocumentBody::from_collab(&collab) + .and_then(|doc| doc.to_plain_text(txn, false, true).ok()); + + if let Some(text) = text { + let pending = UnindexedCollabTask::new( + Uuid::parse_str(&workspace_id)?, + object_id.clone(), + collab_type, + UnindexedData::UnindexedText(text), + ); + if let Err(err) = indexer_scheduler.index_pending_collab_one(pending, true) { + warn!("fail to index collab: {}:{}", object_id, err); + } + } + }, + _ => { + // TODO(nathan): support other collab types + }, + } + Ok::<_, AppError>(params) }) .await??; - let pending = PendingUnindexedCollab { - workspace_id: Uuid::parse_str(&self.workspace_id)?, - object_id: self.object_id.clone(), - collab_type: self.collab_type.clone(), - data: UnindexedData::UnindexedEncodeCollab(params.encoded_collab_v1.clone()), - }; - self - .indexer_scheduler - .index_pending_collab_one(pending, true)?; - self .storage .queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, flush_to_disk) diff --git a/services/appflowy-worker/Cargo.toml b/services/appflowy-worker/Cargo.toml index 9bb8622d6..a448000b0 100644 --- a/services/appflowy-worker/Cargo.toml +++ b/services/appflowy-worker/Cargo.toml @@ -63,4 +63,9 @@ base64.workspace = true prometheus-client = "0.22.3" reqwest.workspace = true zstd.workspace = true -indexer.workspace = true \ No newline at end of file +indexer.workspace = true +appflowy-collaborate = { path = "../appflowy-collaborate" } +rayon = "1.10.0" +app-error = { workspace = true, features = [ + "sqlx_error", +] } diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index b4f2642cc..e02bf27df 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -15,10 +15,13 @@ use secrecy::ExposeSecret; use crate::mailer::AFWorkerMailer; use crate::metric::ImportMetrics; +use appflowy_worker::indexer_worker::{run_background_indexer, BackgroundIndexerConfig}; use axum::extract::State; use axum::http::StatusCode; use axum::response::IntoResponse; use axum::routing::get; +use indexer::metrics::EmbeddingMetrics; +use indexer::thread_pool::ThreadPoolNoAbortBuilder; use infra::env_util::get_env_var; use mailer::sender::Mailer; use std::sync::{Arc, Once}; @@ -124,6 +127,25 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err maximum_import_file_size, )); + let threads = Arc::new( + ThreadPoolNoAbortBuilder::new() + .num_threads(20) + .thread_name(|index| format!("background-embedding-thread-{index}")) + .build() + .unwrap(), + ); + + tokio::spawn(run_background_indexer( + state.pg_pool.clone(), + state.redis_client.clone(), + state.metrics.embedder_metrics.clone(), + threads.clone(), + BackgroundIndexerConfig { + open_api_key: appflowy_collaborate::config::get_env_var("APPFLOWY_AI_OPENAI_API_KEY", ""), + tick_interval_secs: 10, + }, + )); + let app = Router::new() .route("/metrics", get(metrics_handler)) .with_state(Arc::new(state)); @@ -212,15 +234,18 @@ pub struct AppMetrics { #[allow(dead_code)] registry: Arc, import_metrics: Arc, + embedder_metrics: Arc, } impl AppMetrics { pub fn new() -> Self { let mut registry = prometheus_client::registry::Registry::default(); let import_metrics = Arc::new(ImportMetrics::register(&mut registry)); + let embedder_metrics = Arc::new(EmbeddingMetrics::register(&mut registry)); Self { registry: Arc::new(registry), import_metrics, + embedder_metrics, } } } diff --git a/services/appflowy-worker/src/indexer/mod.rs b/services/appflowy-worker/src/indexer/mod.rs deleted file mode 100644 index 30cbf0955..000000000 --- a/services/appflowy-worker/src/indexer/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -// mod worker; - -// pub use worker::*; diff --git a/services/appflowy-worker/src/indexer/worker.rs b/services/appflowy-worker/src/indexer/worker.rs deleted file mode 100644 index 02fb3206e..000000000 --- a/services/appflowy-worker/src/indexer/worker.rs +++ /dev/null @@ -1,55 +0,0 @@ -use indexer::metrics::EmbeddingMetrics; -use indexer::queue::{ - default_indexer_group_option, ensure_indexer_consumer_group, read_background_embed_tasks, - EmbedderTask, -}; -use redis::aio::ConnectionManager; -use sqlx::PgPool; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::{interval, MissedTickBehavior}; -use tracing::{error, info}; - -pub async fn run_background_indexer( - pg_pool: PgPool, - mut redis_client: ConnectionManager, - embed_metrics: Arc, - tick_interval_secs: u64, -) { - info!("Starting background indexer..."); - if let Err(err) = ensure_indexer_consumer_group(&mut redis_client).await { - error!("Failed to ensure indexer consumer group: {:?}", err); - } -} - -async fn process_upcoming_tasks( - redis_client: &mut ConnectionManager, - pg_pool: PgPool, - metrics: Arc, - tick_interval_secs: u64, -) { - let options = default_indexer_group_option(10); - let mut interval = interval(Duration::from_secs(tick_interval_secs)); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - interval.tick().await; - - loop { - interval.tick().await; - if let Ok(replay) = read_background_embed_tasks(redis_client, &options).await { - for key in replay.keys { - for stream_id in key.ids { - match EmbedderTask::try_from(&stream_id) { - Ok(task) => { - // - }, - Err(err) => { - error!("Failed to parse embedder task: {:?}", err); - }, - } - } - } - } - } -} - -async fn handle_task(pg_pool: PgPool, task: EmbedderTask) {} diff --git a/services/appflowy-worker/src/indexer_worker/mod.rs b/services/appflowy-worker/src/indexer_worker/mod.rs new file mode 100644 index 000000000..85d58e6b6 --- /dev/null +++ b/services/appflowy-worker/src/indexer_worker/mod.rs @@ -0,0 +1,2 @@ +mod worker; +pub use worker::*; diff --git a/services/appflowy-worker/src/indexer_worker/worker.rs b/services/appflowy-worker/src/indexer_worker/worker.rs new file mode 100644 index 000000000..cd241e14c --- /dev/null +++ b/services/appflowy-worker/src/indexer_worker/worker.rs @@ -0,0 +1,182 @@ +use app_error::AppError; +use indexer::collab_indexer::{Indexer, IndexerProvider}; +use indexer::entity::EmbeddingRecord; +use indexer::metrics::EmbeddingMetrics; +use indexer::queue::{ + ack_task, default_indexer_group_option, ensure_indexer_consumer_group, + read_background_embed_tasks, +}; +use indexer::scheduler::{spawn_pg_write_embeddings, UnindexedCollabTask, UnindexedData}; +use indexer::thread_pool::ThreadPoolNoAbort; +use indexer::vector::embedder::Embedder; +use indexer::vector::open_ai; +use rayon::prelude::*; +use redis::aio::ConnectionManager; +use sqlx::PgPool; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::RwLock; +use tokio::time::{interval, MissedTickBehavior}; +use tracing::{error, info, trace}; + +#[derive(Debug)] +pub struct BackgroundIndexerConfig { + pub open_api_key: String, + pub tick_interval_secs: u64, +} + +pub async fn run_background_indexer( + pg_pool: PgPool, + mut redis_client: ConnectionManager, + embed_metrics: Arc, + threads: Arc, + config: BackgroundIndexerConfig, +) { + if config.open_api_key.is_empty() { + error!("OpenAI API key is not set. Stop background indexer"); + return; + } + + let indexer_provider = IndexerProvider::new(); + info!("Starting background indexer..."); + if let Err(err) = ensure_indexer_consumer_group(&mut redis_client).await { + error!("Failed to ensure indexer consumer group: {:?}", err); + } + + let latest_write_embedding_err = Arc::new(RwLock::new(None)); + let (write_embedding_tx, write_embedding_rx) = unbounded_channel::(); + let write_embedding_task_fut = spawn_pg_write_embeddings( + write_embedding_rx, + pg_pool, + embed_metrics.clone(), + latest_write_embedding_err.clone(), + ); + + let process_tasks_task_fut = process_upcoming_tasks( + &mut redis_client, + embed_metrics, + indexer_provider, + threads, + config, + write_embedding_tx, + latest_write_embedding_err, + ); + + tokio::select! { + _ = write_embedding_task_fut => { + error!("[Background Embedding] Write embedding task stopped"); + }, + _ = process_tasks_task_fut => { + error!("[Background Embedding] Process tasks task stopped"); + }, + } +} + +async fn process_upcoming_tasks( + redis_client: &mut ConnectionManager, + metrics: Arc, + indexer_provider: Arc, + threads: Arc, + config: BackgroundIndexerConfig, + sender: UnboundedSender, + latest_write_embedding_err: Arc>>, +) { + let options = default_indexer_group_option(threads.current_num_threads()); + let mut interval = interval(Duration::from_secs(config.tick_interval_secs)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + interval.tick().await; + + loop { + interval.tick().await; + + let latest_error = latest_write_embedding_err.write().await.take(); + if let Some(err) = latest_error { + if matches!(err, AppError::ActionTimeout(_)) { + info!( + "[Background Embedding] last write embedding task failed with timeout, waiting for 30s before retrying..." + ); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + + if let Ok(replay) = read_background_embed_tasks(redis_client, &options).await { + let all_keys: Vec = replay + .keys + .iter() + .flat_map(|key| key.ids.iter().map(|stream_id| stream_id.id.clone())) + .collect(); + + for key in replay.keys { + info!("[Background Embedding] processing {} tasks", key.ids.len()); + key.ids.into_par_iter().for_each(|stream_id| { + let result = threads.install(|| match UnindexedCollabTask::try_from(&stream_id) { + Ok(task) => { + if let Some(indexer) = indexer_provider.indexer_for(&task.collab_type) { + let start = Instant::now(); + let embedder = create_embedder(&config); + let result = handle_task(embedder, indexer, task); + let cost = start.elapsed().as_millis(); + metrics.record_write_embedding_time(cost); + if let Some(record) = result { + let _ = sender.send(record); + } + } + }, + Err(err) => { + error!( + "[Background Embedding] failed to parse embedder task: {:?}", + err + ); + }, + }); + if let Err(err) = result { + error!( + "[Background Embedding] Failed to process embedder task: {:?}", + err + ); + } + }); + } + + if !all_keys.is_empty() { + match ack_task(redis_client, all_keys, true).await { + Ok(_) => trace!("[Background embedding]: delete tasks from stream"), + Err(err) => { + error!("[Background Embedding] Failed to ack tasks: {:?}", err); + }, + } + } + } + } +} + +fn handle_task( + embedder: Embedder, + indexer: Arc, + task: UnindexedCollabTask, +) -> Option { + trace!( + "[Background Embedding] processing task: {}, content:{:?}, collab_type: {}", + task.object_id, + task.data, + task.collab_type + ); + let chunks = match task.data { + UnindexedData::UnindexedText(text) => indexer + .create_embedded_chunks_from_text(task.object_id.clone(), text, embedder.model()) + .ok()?, + }; + let embeddings = indexer.embed(&embedder, chunks).ok()?; + embeddings.map(|embeddings| EmbeddingRecord { + workspace_id: task.workspace_id, + object_id: task.object_id, + collab_type: task.collab_type, + tokens_used: embeddings.tokens_consumed, + contents: embeddings.params, + }) +} + +fn create_embedder(config: &BackgroundIndexerConfig) -> Embedder { + Embedder::OpenAI(open_ai::Embedder::new(config.open_api_key.clone())) +} diff --git a/services/appflowy-worker/src/lib.rs b/services/appflowy-worker/src/lib.rs index 599bff96a..2dddf99e5 100644 --- a/services/appflowy-worker/src/lib.rs +++ b/services/appflowy-worker/src/lib.rs @@ -1,6 +1,6 @@ pub mod error; pub mod import_worker; -pub mod indexer; +pub mod indexer_worker; mod mailer; pub mod metric; pub mod s3_client; diff --git a/src/api/util.rs b/src/api/util.rs index 3d48b1590..64bb6ea70 100644 --- a/src/api/util.rs +++ b/src/api/util.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use byteorder::{ByteOrder, LittleEndian}; use chrono::Utc; use collab_rt_entity::user::RealtimeUser; -use collab_rt_protocol::spawn_blocking_validate_encode_collab; +use collab_rt_protocol::validate_encode_collab; use database_entity::dto::CollabParams; use std::str::FromStr; use tokio_stream::StreamExt; @@ -119,13 +119,9 @@ pub trait CollabValidator { #[async_trait] impl CollabValidator for CollabParams { async fn check_encode_collab(&self) -> Result<(), AppError> { - spawn_blocking_validate_encode_collab( - &self.object_id, - &self.encoded_collab_v1, - &self.collab_type, - ) - .await - .map_err(|err| AppError::NoRequiredData(err.to_string())) + validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type) + .await + .map_err(|err| AppError::NoRequiredData(err.to_string())) } } diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 05d4e65f3..73414db84 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -1,5 +1,5 @@ use crate::api::util::{client_version_from_headers, realtime_user_for_web_request, PayloadReader}; -use crate::api::util::{compress_type_from_header_value, device_id_from_headers, CollabValidator}; +use crate::api::util::{compress_type_from_header_value, device_id_from_headers}; use crate::api::ws::RealtimeServerAddr; use crate::biz; use crate::biz::collab::ops::{ @@ -35,20 +35,25 @@ use appflowy_collaborate::actix_ws::entities::{ClientHttpStreamMessage, ClientHt use authentication::jwt::{Authorization, OptionalUserUuid, UserUuid}; use bytes::BytesMut; use chrono::{DateTime, Duration, Utc}; +use collab::core::collab::DataSource; +use collab::core::origin::CollabOrigin; +use collab::entity::EncodedCollab; +use collab::preclude::Collab; use collab_database::entity::FieldType; +use collab_document::document::Document; use collab_entity::CollabType; use collab_folder::timestamp; use collab_rt_entity::collab_proto::{CollabDocStateParams, PayloadCompressionType}; use collab_rt_entity::realtime_proto::HttpRealtimeMessage; use collab_rt_entity::user::RealtimeUser; use collab_rt_entity::RealtimeMessage; -use collab_rt_protocol::validate_encode_collab; +use collab_rt_protocol::collab_from_encode_collab; use database::collab::{CollabStorage, GetCollabOrigin}; use database::user::select_uid_from_email; use database_entity::dto::PublishCollabItem; use database_entity::dto::PublishInfo; use database_entity::dto::*; -use indexer::scheduler::PendingUnindexedCollab; +use indexer::scheduler::{UnindexedCollabTask, UnindexedData}; use prost::Message as ProstMessage; use rayon::prelude::*; use sha2::{Digest, Sha256}; @@ -707,7 +712,16 @@ async fn create_collab_handler( ); } - if let Err(err) = params.check_encode_collab().await { + let collab = collab_from_encode_collab(¶ms.object_id, ¶ms.encoded_collab_v1) + .await + .map_err(|err| { + AppError::InvalidRequest(format!( + "Failed to create collab from encoded collab: {}", + err + )) + })?; + + if let Err(err) = params.collab_type.validate_require_data(&collab) { return Err( AppError::NoRequiredData(format!( "collab doc state is not correct:{},{}", @@ -722,12 +736,19 @@ async fn create_collab_handler( .can_index_workspace(&workspace_id) .await? { - let workspace_id_uuid = - Uuid::parse_str(&workspace_id).map_err(|err| AppError::Internal(err.into()))?; - state.indexer_scheduler.index_pending_collab_one( - PendingUnindexedCollab::from((workspace_id_uuid, ¶ms)), - true, - )?; + if let Ok(text) = Document::open(collab).and_then(|doc| doc.to_plain_text(false, true)) { + let workspace_id_uuid = + Uuid::parse_str(&workspace_id).map_err(|err| AppError::Internal(err.into()))?; + let pending = UnindexedCollabTask::new( + workspace_id_uuid, + params.object_id.clone(), + params.collab_type.clone(), + UnindexedData::UnindexedText(text), + ); + state + .indexer_scheduler + .index_pending_collab_one(pending, false)?; + } } let mut transaction = state @@ -796,7 +817,7 @@ async fn batch_create_collab_handler( } } // Perform decompression and processing in a Rayon thread pool - let collab_params_list = tokio::task::spawn_blocking(move || match compress_type { + let mut collab_params_list = tokio::task::spawn_blocking(move || match compress_type { CompressionType::Brotli { buffer_size } => offset_len_list .into_par_iter() .filter_map(|(offset, len)| { @@ -805,12 +826,31 @@ async fn batch_create_collab_handler( Ok(decompressed_data) => { let params = CollabParams::from_bytes(&decompressed_data).ok()?; if params.validate().is_ok() { - match validate_encode_collab( + let encoded_collab = + EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1).ok()?; + let collab = Collab::new_with_source( + CollabOrigin::Empty, ¶ms.object_id, - ¶ms.encoded_collab_v1, - ¶ms.collab_type, - ) { - Ok(_) => Some(params), + DataSource::DocStateV1(encoded_collab.doc_state.to_vec()), + vec![], + false, + ) + .ok()?; + + match params.collab_type.validate_require_data(&collab) { + Ok(_) => { + match params.collab_type { + CollabType::Document => { + let index_text = + Document::open(collab).and_then(|doc| doc.to_plain_text(false, true)); + Some((Some(index_text), params)) + }, + _ => { + // TODO(nathan): support other types + Some((None, params)) + }, + } + }, Err(_) => None, } } else { @@ -834,7 +874,7 @@ async fn batch_create_collab_handler( let total_size = collab_params_list .iter() - .fold(0, |acc, x| acc + x.encoded_collab_v1.len()); + .fold(0, |acc, x| acc + x.1.encoded_collab_v1.len()); event!( tracing::Level::INFO, "decompressed {} collab objects in {:?}", @@ -842,19 +882,40 @@ async fn batch_create_collab_handler( start.elapsed() ); - let mut indexed_collabs = vec![]; + let mut pending_undexed_collabs = vec![]; if state .indexer_scheduler .can_index_workspace(&workspace_id) .await? { - indexed_collabs = collab_params_list - .iter() - .filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type)) - .map(|value| PendingUnindexedCollab::from((workspace_id_uuid, value))) + pending_undexed_collabs = collab_params_list + .iter_mut() + .filter(|p| { + state + .indexer_scheduler + .is_indexing_enabled(&p.1.collab_type) + }) + .flat_map(|value| match std::mem::take(&mut value.0) { + None => None, + Some(text) => text + .map(|text| { + UnindexedCollabTask::new( + workspace_id_uuid, + value.1.object_id.clone(), + value.1.collab_type.clone(), + UnindexedData::UnindexedText(text), + ) + }) + .ok(), + }) .collect::>(); } + let collab_params_list = collab_params_list + .into_iter() + .map(|(_, params)| params) + .collect::>(); + let start = Instant::now(); state .collab_access_control_storage @@ -869,10 +930,10 @@ async fn batch_create_collab_handler( ); // Must after batch_insert_new_collab - if !indexed_collabs.is_empty() { + if !pending_undexed_collabs.is_empty() { state .indexer_scheduler - .index_pending_collabs(indexed_collabs, true)?; + .index_pending_collabs(pending_undexed_collabs)?; } Ok(Json(AppResponse::Ok())) @@ -1375,10 +1436,43 @@ async fn update_collab_handler( { let workspace_id_uuid = Uuid::parse_str(&workspace_id).map_err(|err| AppError::Internal(err.into()))?; - state.indexer_scheduler.index_pending_collab_one( - PendingUnindexedCollab::from((workspace_id_uuid, ¶ms)), - false, - )?; + + match params.collab_type { + CollabType::Document => { + let collab = collab_from_encode_collab(¶ms.object_id, ¶ms.encoded_collab_v1) + .await + .map_err(|err| { + AppError::InvalidRequest(format!( + "Failed to create collab from encoded collab: {}", + err + )) + })?; + params + .collab_type + .validate_require_data(&collab) + .map_err(|err| { + AppError::NoRequiredData(format!( + "collab doc state is not correct:{},{}", + params.object_id, err + )) + })?; + + if let Ok(text) = Document::open(collab).and_then(|doc| doc.to_plain_text(false, true)) { + let pending = UnindexedCollabTask::new( + workspace_id_uuid, + params.object_id.clone(), + params.collab_type.clone(), + UnindexedData::UnindexedText(text), + ); + state + .indexer_scheduler + .index_pending_collab_one(pending, false)?; + } + }, + _ => { + // TODO(nathan): support other collab type + }, + } } state diff --git a/src/application.rs b/src/application.rs index 6e67cc451..2f938b1eb 100644 --- a/src/application.rs +++ b/src/application.rs @@ -325,18 +325,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result() .unwrap_or(true), openai_api_key: get_env_var("APPFLOWY_AI_OPENAI_API_KEY", ""), - enable_background_indexing: get_env_var( - "APPFLOWY_INDEXER_BACKGROUND_INDEXING_ENABLED", - "false", - ) - .parse::() - .unwrap_or(false), embedding_buffer_size: appflowy_collaborate::config::get_env_var( "APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE", - "2000", + "5000", ) .parse::() - .unwrap_or(2000), + .unwrap_or(5000), }; let indexer_scheduler = IndexerScheduler::new( IndexerProvider::new(), diff --git a/tests/collab/collab_curd_test.rs b/tests/collab/collab_curd_test.rs index 99cf17649..a0d334097 100644 --- a/tests/collab/collab_curd_test.rs +++ b/tests/collab/collab_curd_test.rs @@ -11,10 +11,7 @@ use reqwest::Method; use serde::Serialize; use serde_json::json; -use crate::collab::util::{ - alex_software_engineer_story, empty_document_editor, generate_random_string, - test_encode_collab_v1, -}; +use crate::collab::util::{empty_document_editor, generate_random_string, test_encode_collab_v1}; use client_api_test::TestClient; use shared_entity::response::AppResponse; use uuid::Uuid; @@ -155,15 +152,15 @@ async fn batch_insert_document_collab_test() { let mut test_client = TestClient::new_user().await; let workspace_id = test_client.workspace_id().await; - let num_collabs = 200; + let num_collabs = 100; let mut list = vec![]; for _ in 0..num_collabs { let object_id = Uuid::new_v4().to_string(); let mut editor = empty_document_editor(&object_id); let paragraphs = vec![ - generate_random_string(10), - generate_random_string(24), - generate_random_string(12), + generate_random_string(1), + generate_random_string(2), + generate_random_string(5), ]; editor.insert_paragraphs(paragraphs); list.push((object_id, editor.encode_collab())); diff --git a/tests/sql_test/mod.rs b/tests/sql_test/mod.rs index daac8ab12..f04949068 100644 --- a/tests/sql_test/mod.rs +++ b/tests/sql_test/mod.rs @@ -2,4 +2,3 @@ mod chat_test; mod history_test; pub(crate) mod util; mod workspace_test; -mod upsert_embedding_test; diff --git a/tests/sql_test/upsert_embedding_test.rs b/tests/sql_test/upsert_embedding_test.rs deleted file mode 100644 index dd2ba0dd9..000000000 --- a/tests/sql_test/upsert_embedding_test.rs +++ /dev/null @@ -1,14 +0,0 @@ -use crate::sql_test::util::{setup_db, test_create_user}; -use sqlx::PgPool; - -#[sqlx::test(migrations = false)] -async fn upsert_embeddings_test(pool: PgPool) { - setup_db(&pool).await.unwrap(); - - let user_uuid = uuid::Uuid::new_v4(); - let name = user_uuid.to_string(); - let email = format!("{}@appflowy.io", name); - let user = test_create_user(&pool, user_uuid, &email, &name) - .await - .unwrap(); -}