Skip to content

Commit

Permalink
chore: background index
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Dec 23, 2024
1 parent ac82548 commit 27a2837
Show file tree
Hide file tree
Showing 27 changed files with 508 additions and 301 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy.env
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ APPFLOWY_LOCAL_AI_TEST_ENABLED=false
APPFLOWY_INDEXER_ENABLED=true
APPFLOWY_INDEXER_DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
APPFLOWY_INDEXER_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}
APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=2000
APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=5000

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
Expand Down
2 changes: 1 addition & 1 deletion dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ APPFLOWY_LOCAL_AI_TEST_ENABLED=false
APPFLOWY_INDEXER_ENABLED=true
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=2000
APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE=5000

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
Expand Down
1 change: 1 addition & 0 deletions libs/app-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl From<sqlx::Error> for AppError {
sqlx::Error::RowNotFound => {
AppError::RecordNotFound(format!("Record not exist in db. {})", msg))
},
sqlx::Error::PoolTimedOut => AppError::ActionTimeout(value.to_string()),
_ => AppError::SqlxError(msg),
}
}
Expand Down
23 changes: 4 additions & 19 deletions libs/collab-rt-protocol/src/data_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@ use collab::preclude::Collab;
use collab_entity::CollabType;
use tracing::instrument;

#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
#[inline]
pub async fn spawn_blocking_validate_encode_collab(
object_id: &str,
data: &[u8],
collab_type: &CollabType,
) -> Result<(), Error> {
let collab_type = collab_type.clone();
pub async fn collab_from_encode_collab(object_id: &str, data: &[u8]) -> Result<Collab, Error> {
let object_id = object_id.to_string();
let data = data.to_vec();

Expand All @@ -27,28 +21,19 @@ pub async fn spawn_blocking_validate_encode_collab(
false,
)?;

collab_type.validate_require_data(&collab)?;
Ok::<(), Error>(())
Ok::<_, Error>(collab)
})
.await?
}

#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
#[inline]
pub fn validate_encode_collab(
pub async fn validate_encode_collab(
object_id: &str,
data: &[u8],
collab_type: &CollabType,
) -> Result<(), Error> {
let encoded_collab = EncodedCollab::decode_from_bytes(data)?;
let collab = Collab::new_with_source(
CollabOrigin::Empty,
object_id,
DataSource::DocStateV1(encoded_collab.doc_state.to_vec()),
vec![],
false,
)?;

let collab = collab_from_encode_collab(object_id, data).await?;
collab_type.validate_require_data(&collab)?;
Ok::<(), Error>(())
}
15 changes: 11 additions & 4 deletions libs/indexer/src/collab_indexer/document_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct DocumentIndexer;

#[async_trait]
impl Indexer for DocumentIndexer {
fn create_embedded_chunks(
fn create_embedded_chunks_from_collab(
&self,
collab: &Collab,
embedding_model: EmbeddingModel,
Expand All @@ -35,9 +35,7 @@ impl Indexer for DocumentIndexer {

let result = document.to_plain_text(collab.transact(), false, true);
match result {
Ok(content) => {
split_text_into_chunks(object_id, content, CollabType::Document, &embedding_model)
},
Ok(content) => self.create_embedded_chunks_from_text(object_id, content, embedding_model),
Err(err) => {
if matches!(err, DocumentError::NoRequiredData) {
Ok(vec![])
Expand All @@ -48,6 +46,15 @@ impl Indexer for DocumentIndexer {
}
}

fn create_embedded_chunks_from_text(
&self,
object_id: String,
text: String,
model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError> {
split_text_into_chunks(object_id, text, CollabType::Document, &model)
}

fn embed(
&self,
embedder: &Embedder,
Expand Down
9 changes: 8 additions & 1 deletion libs/indexer/src/collab_indexer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ use std::sync::Arc;
use tracing::info;

pub trait Indexer: Send + Sync {
fn create_embedded_chunks(
fn create_embedded_chunks_from_collab(
&self,
collab: &Collab,
model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError>;

fn create_embedded_chunks_from_text(
&self,
object_id: String,
text: String,
model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError>;

fn embed(
&self,
embedder: &Embedder,
Expand Down
4 changes: 2 additions & 2 deletions libs/indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod collab_indexer;
mod entity;
pub mod entity;
mod error;
pub mod metrics;
pub mod queue;
pub mod scheduler;
pub mod thread_pool;
mod unindexed_workspace;
mod vector;
pub mod vector;
36 changes: 7 additions & 29 deletions libs/indexer/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,18 @@
use crate::error::IndexerError;
use crate::scheduler::PendingUnindexedCollab;
use crate::scheduler::UnindexedCollabTask;
use anyhow::anyhow;
use app_error::AppError;
use collab_entity::CollabType;
use redis::aio::ConnectionManager;
use redis::streams::{StreamId, StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, RedisResult, Value};
use serde::{Deserialize, Serialize};
use serde_json::from_str;
use tracing::error;

pub const INDEX_TASK_STREAM_NAME: &str = "index_collab_task_stream";
const INDEXER_WORKER_GROUP_NAME: &str = "indexer_worker_group";
const INDEXER_CONSUMER_NAME: &str = "appflowy_worker";

#[derive(Debug, Serialize, Deserialize)]
pub enum EmbedderTask {
CollabId {
object_id: String,
collab_type: CollabType,
/// Timestamp that indicates when the task was created.
created_at: i64,
},
}

impl From<PendingUnindexedCollab> for EmbedderTask {
fn from(pending_collab: PendingUnindexedCollab) -> Self {
EmbedderTask::CollabId {
object_id: pending_collab.object_id,
collab_type: pending_collab.collab_type,
created_at: chrono::Utc::now().timestamp(),
}
}
}

impl TryFrom<&StreamId> for EmbedderTask {
impl TryFrom<&StreamId> for UnindexedCollabTask {
type Error = IndexerError;

fn try_from(stream_id: &StreamId) -> Result<Self, Self::Error> {
Expand All @@ -57,7 +35,7 @@ impl TryFrom<&StreamId> for EmbedderTask {
},
};

from_str::<EmbedderTask>(&task_str).map_err(|err| IndexerError::Internal(err.into()))
from_str::<UnindexedCollabTask>(&task_str).map_err(|err| IndexerError::Internal(err.into()))
}
}

Expand All @@ -68,7 +46,7 @@ impl TryFrom<&StreamId> for EmbedderTask {
///
pub async fn add_background_embed_task(
redis_client: ConnectionManager,
tasks: Vec<EmbedderTask>,
tasks: Vec<UnindexedCollabTask>,
) -> Result<(), AppError> {
let items = tasks
.into_iter()
Expand Down Expand Up @@ -132,14 +110,14 @@ pub async fn read_background_embed_tasks(
/// If `false`, the task remains in the stream after acknowledgment.
pub async fn ack_task(
redis_client: &mut ConnectionManager,
stream_entity_id: &str,
stream_entity_ids: Vec<String>,
delete_task: bool,
) -> Result<(), IndexerError> {
let _: () = redis_client
.xack(
INDEX_TASK_STREAM_NAME,
INDEXER_WORKER_GROUP_NAME,
&[stream_entity_id],
&stream_entity_ids,
)
.await
.map_err(|err| {
Expand All @@ -149,7 +127,7 @@ pub async fn ack_task(

if delete_task {
let _: () = redis_client
.xdel(INDEX_TASK_STREAM_NAME, &[stream_entity_id])
.xdel(INDEX_TASK_STREAM_NAME, &stream_entity_ids)
.await
.map_err(|err| {
error!("Failed to delete task: {:?}", err);
Expand Down
Loading

0 comments on commit 27a2837

Please sign in to comment.