From b67a138e2e92815b2e8020d29a213951d645b391 Mon Sep 17 00:00:00 2001 From: nathan Date: Wed, 18 Dec 2024 16:15:35 +0800 Subject: [PATCH] chore: add metrics for write embeeding to pg --- .../src/indexer/indexer_scheduler.rs | 36 +++++++++++++++---- .../src/indexer/metrics.rs | 18 ++++++++-- services/appflowy-worker/src/config.rs | 2 +- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs index adb7791a2..8e02e32b2 100644 --- a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs +++ b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs @@ -26,8 +26,9 @@ use rayon::prelude::*; use sqlx::PgPool; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::time::timeout; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; @@ -101,8 +102,13 @@ impl IndexerScheduler { this.index_enabled(), num_thread ); + if this.index_enabled() { - tokio::spawn(spawn_write_indexing(rx, this.pg_pool.clone())); + tokio::spawn(spawn_write_indexing( + rx, + this.pg_pool.clone(), + this.metrics.clone(), + )); tokio::spawn(handle_unindexed_collabs(this.clone())); } @@ -301,7 +307,7 @@ impl IndexerScheduler { indexer.embed(&embedder, chunks) }); let duration = start.elapsed(); - metrics.record_processing_time(duration.as_millis()); + metrics.record_generate_embedding_time(duration.as_millis()); match result { Ok(embed_result) => match embed_result { @@ -494,7 +500,11 @@ async fn index_unindexd_collab( } const EMBEDDING_RECORD_BUFFER_SIZE: usize = 5; -async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_pool: PgPool) { +async fn spawn_write_indexing( + mut rx: UnboundedReceiver, + pg_pool: PgPool, + metrics: Arc, +) { let mut buf = Vec::with_capacity(EMBEDDING_RECORD_BUFFER_SIZE); loop { let n = rx.recv_many(&mut buf, EMBEDDING_RECORD_BUFFER_SIZE).await; @@ -503,6 +513,7 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_poo break; } + let start = Instant::now(); let records = buf.drain(..n).collect::>(); for record in records.iter() { info!( @@ -510,7 +521,20 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver, pg_poo record.object_id, record.tokens_used ); } - match batch_insert_records(&pg_pool, records).await { + + let result = timeout( + Duration::from_secs(20), + batch_insert_records(&pg_pool, records), + ) + .await + .unwrap_or_else(|_| { + Err(AppError::Internal(anyhow!( + "timeout when writing embeddings" + ))) + }); + + metrics.record_write_embedding_time(start.elapsed().as_millis()); + match result { Ok(_) => trace!("[Embedding] save {} embeddings to disk", n), Err(err) => error!("Failed to write collab embedding to disk:{}", err), } @@ -567,7 +591,7 @@ fn process_collab( let chunks = indexer.create_embedded_chunks(&collab, embdder.model())?; let result = indexer.embed(embdder, chunks); let duration = start_time.elapsed(); - metrics.record_processing_time(duration.as_millis()); + metrics.record_generate_embedding_time(duration.as_millis()); match result { Ok(Some(embeddings)) => Ok(Some((embeddings.tokens_consumed, embeddings.params))), diff --git a/services/appflowy-collaborate/src/indexer/metrics.rs b/services/appflowy-collaborate/src/indexer/metrics.rs index e410b5086..6825bdb8d 100644 --- a/services/appflowy-collaborate/src/indexer/metrics.rs +++ b/services/appflowy-collaborate/src/indexer/metrics.rs @@ -5,6 +5,7 @@ pub struct EmbeddingMetrics { total_embed_count: Counter, failed_embed_count: Counter, processing_time_histogram: Histogram, + write_embedding_time_histogram: Histogram, } impl EmbeddingMetrics { @@ -12,7 +13,8 @@ impl EmbeddingMetrics { Self { total_embed_count: Counter::default(), failed_embed_count: Counter::default(), - processing_time_histogram: Histogram::new([100.0, 300.0, 800.0, 2000.0, 5000.0].into_iter()), + processing_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()), + write_embedding_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()), } } @@ -36,6 +38,11 @@ impl EmbeddingMetrics { "Histogram of embedding processing times", metrics.processing_time_histogram.clone(), ); + realtime_registry.register( + "write_embedding_time_seconds", + "Histogram of embedding write times", + metrics.write_embedding_time_histogram.clone(), + ); metrics } @@ -48,8 +55,13 @@ impl EmbeddingMetrics { self.failed_embed_count.inc_by(count); } - pub fn record_processing_time(&self, millis: u128) { - tracing::trace!("[Embedding]: processing time: {}ms", millis); + pub fn record_generate_embedding_time(&self, millis: u128) { + tracing::trace!("[Embedding]: generate embeddings cost: {}ms", millis); self.processing_time_histogram.observe(millis as f64); } + + pub fn record_write_embedding_time(&self, millis: u128) { + tracing::trace!("[Embedding]: write embedding time cost: {}ms", millis); + self.write_embedding_time_histogram.observe(millis as f64); + } } diff --git a/services/appflowy-worker/src/config.rs b/services/appflowy-worker/src/config.rs index f8354c6e1..d5bb99af0 100644 --- a/services/appflowy-worker/src/config.rs +++ b/services/appflowy-worker/src/config.rs @@ -56,7 +56,7 @@ impl Config { // Adapted from: https://github.com/AppFlowy-IO/AppFlowy-Cloud/issues/984 smtp_username: get_env_var("APPFLOWY_MAILER_SMTP_USERNAME", "sender@example.com"), smtp_password: get_env_var("APPFLOWY_MAILER_SMTP_PASSWORD", "password").into(), - smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper").into(), + smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper"), }, }) }