Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add metrics for write embeeding to pg #1084

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions services/appflowy-collaborate/src/indexer/indexer_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use rayon::prelude::*;
use sqlx::PgPool;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::time::timeout;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -101,8 +102,13 @@ impl IndexerScheduler {
this.index_enabled(),
num_thread
);

if this.index_enabled() {
tokio::spawn(spawn_write_indexing(rx, this.pg_pool.clone()));
tokio::spawn(spawn_write_indexing(
rx,
this.pg_pool.clone(),
this.metrics.clone(),
));
tokio::spawn(handle_unindexed_collabs(this.clone()));
}

Expand Down Expand Up @@ -301,7 +307,7 @@ impl IndexerScheduler {
indexer.embed(&embedder, chunks)
});
let duration = start.elapsed();
metrics.record_processing_time(duration.as_millis());
metrics.record_generate_embedding_time(duration.as_millis());

match result {
Ok(embed_result) => match embed_result {
Expand Down Expand Up @@ -494,7 +500,11 @@ async fn index_unindexd_collab(
}

const EMBEDDING_RECORD_BUFFER_SIZE: usize = 5;
async fn spawn_write_indexing(mut rx: UnboundedReceiver<EmbeddingRecord>, pg_pool: PgPool) {
async fn spawn_write_indexing(
mut rx: UnboundedReceiver<EmbeddingRecord>,
pg_pool: PgPool,
metrics: Arc<EmbeddingMetrics>,
) {
let mut buf = Vec::with_capacity(EMBEDDING_RECORD_BUFFER_SIZE);
loop {
let n = rx.recv_many(&mut buf, EMBEDDING_RECORD_BUFFER_SIZE).await;
Expand All @@ -503,14 +513,28 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver<EmbeddingRecord>, pg_poo
break;
}

let start = Instant::now();
let records = buf.drain(..n).collect::<Vec<_>>();
for record in records.iter() {
info!(
"[Embedding] generate collab:{} embeddings, tokens used: {}",
record.object_id, record.tokens_used
);
}
match batch_insert_records(&pg_pool, records).await {

let result = timeout(
Duration::from_secs(20),
batch_insert_records(&pg_pool, records),
)
.await
.unwrap_or_else(|_| {
Err(AppError::Internal(anyhow!(
"timeout when writing embeddings"
)))
});

metrics.record_write_embedding_time(start.elapsed().as_millis());
match result {
Ok(_) => trace!("[Embedding] save {} embeddings to disk", n),
Err(err) => error!("Failed to write collab embedding to disk:{}", err),
}
Expand Down Expand Up @@ -567,7 +591,7 @@ fn process_collab(
let chunks = indexer.create_embedded_chunks(&collab, embdder.model())?;
let result = indexer.embed(embdder, chunks);
let duration = start_time.elapsed();
metrics.record_processing_time(duration.as_millis());
metrics.record_generate_embedding_time(duration.as_millis());

match result {
Ok(Some(embeddings)) => Ok(Some((embeddings.tokens_consumed, embeddings.params))),
Expand Down
18 changes: 15 additions & 3 deletions services/appflowy-collaborate/src/indexer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ pub struct EmbeddingMetrics {
total_embed_count: Counter,
failed_embed_count: Counter,
processing_time_histogram: Histogram,
write_embedding_time_histogram: Histogram,
}

impl EmbeddingMetrics {
fn init() -> Self {
Self {
total_embed_count: Counter::default(),
failed_embed_count: Counter::default(),
processing_time_histogram: Histogram::new([100.0, 300.0, 800.0, 2000.0, 5000.0].into_iter()),
processing_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()),
write_embedding_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()),
}
}

Expand All @@ -36,6 +38,11 @@ impl EmbeddingMetrics {
"Histogram of embedding processing times",
metrics.processing_time_histogram.clone(),
);
realtime_registry.register(
"write_embedding_time_seconds",
"Histogram of embedding write times",
metrics.write_embedding_time_histogram.clone(),
);

metrics
}
Expand All @@ -48,8 +55,13 @@ impl EmbeddingMetrics {
self.failed_embed_count.inc_by(count);
}

pub fn record_processing_time(&self, millis: u128) {
tracing::trace!("[Embedding]: processing time: {}ms", millis);
pub fn record_generate_embedding_time(&self, millis: u128) {
tracing::trace!("[Embedding]: generate embeddings cost: {}ms", millis);
self.processing_time_histogram.observe(millis as f64);
}

pub fn record_write_embedding_time(&self, millis: u128) {
tracing::trace!("[Embedding]: write embedding time cost: {}ms", millis);
self.write_embedding_time_histogram.observe(millis as f64);
}
}
2 changes: 1 addition & 1 deletion services/appflowy-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Config {
// Adapted from: https://github.com/AppFlowy-IO/AppFlowy-Cloud/issues/984
smtp_username: get_env_var("APPFLOWY_MAILER_SMTP_USERNAME", "sender@example.com"),
smtp_password: get_env_var("APPFLOWY_MAILER_SMTP_PASSWORD", "password").into(),
smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper").into(),
smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper"),
},
})
}
Expand Down
Loading