From fab7ab1585ea64eba21a3261cf811b4e3bf04c58 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 27 Mar 2025 18:28:56 +0530 Subject: [PATCH] updates for prism home api --- src/correlation.rs | 6 +- src/prism/home/mod.rs | 294 +++++++++++++++++++++++++------------ src/prism/logstream/mod.rs | 17 ++- 3 files changed, 211 insertions(+), 106 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 150644902..94aa0a15f 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -56,6 +56,8 @@ impl Correlations { let store = PARSEABLE.storage.get_object_store(); let all_correlations = store.get_all_correlations().await.unwrap_or_default(); + let mut guard = self.write().await; + for correlations_bytes in all_correlations.values().flatten() { let correlation = match serde_json::from_slice::(correlations_bytes) { @@ -66,9 +68,7 @@ impl Correlations { } }; - self.write() - .await - .insert(correlation.id.to_owned(), correlation); + guard.insert(correlation.id.to_owned(), correlation); } Ok(()) diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 854727a14..39bd5c4e3 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -41,6 +41,8 @@ use crate::{ users::{dashboards::DASHBOARDS, filters::FILTERS}, }; +type StreamMetadataResponse = Result<(String, Vec, DataSetType), PrismHomeError>; + #[derive(Debug, Serialize, Default)] struct StreamInfo { // stream_count: u32, @@ -89,7 +91,109 @@ pub struct HomeResponse { } pub async fn generate_home_response(key: &SessionKey) -> Result { - // get all stream titles + // Execute these operations concurrently + let ( + stream_titles_result, + alert_titles_result, + correlation_titles_result, + dashboards_result, + filters_result, + alerts_info_result, + ) = tokio::join!( + get_stream_titles(key), + get_alert_titles(key), + get_correlation_titles(key), + get_dashboard_titles(key), + get_filter_titles(key), + get_alerts_info() + ); + + let stream_titles = stream_titles_result?; + let alert_titles = alert_titles_result?; + let correlation_titles = correlation_titles_result?; + let dashboard_titles = dashboards_result?; + let filter_titles = filters_result?; + let alerts_info = alerts_info_result?; + + // Generate dates for date-wise stats + let mut dates = (0..7) + .map(|i| { + Utc::now() + .checked_sub_signed(chrono::Duration::days(i)) + .ok_or_else(|| anyhow::Error::msg("Date conversion failed")) + .unwrap() + }) + .map(|date| date.format("%Y-%m-%d").to_string()) + .collect_vec(); + dates.reverse(); + + // Process stream metadata concurrently + let stream_metadata_futures = stream_titles + .iter() + .map(|stream| get_stream_metadata(stream.clone())); + let stream_metadata_results: Vec = + futures::future::join_all(stream_metadata_futures).await; + + let mut stream_wise_stream_json = HashMap::new(); + let mut datasets = Vec::new(); + + for result in stream_metadata_results { + match result { + Ok((stream, metadata, dataset_type)) => { + stream_wise_stream_json.insert(stream.clone(), metadata); + datasets.push(DataSet { + title: stream, + dataset_type, + }); + } + Err(e) => { + error!("Failed to process stream metadata: {:?}", e); + // Continue with other streams instead of failing entirely + } + } + } + + // Process stats for all dates concurrently + let stats_futures = dates + .iter() + .map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone())); + let stats_results: Vec> = + futures::future::join_all(stats_futures).await; + + let mut stream_details = Vec::new(); + let mut summary = StreamInfo::default(); + + for result in stats_results { + match result { + Ok(dated_stats) => { + summary.stats_summary.events += dated_stats.events; + summary.stats_summary.ingestion += dated_stats.ingestion_size; + summary.stats_summary.storage += dated_stats.storage_size; + stream_details.push(dated_stats); + } + Err(e) => { + error!("Failed to process stats for date: {:?}", e); + // Continue with other dates instead of failing entirely + } + } + } + + Ok(HomeResponse { + stream_info: summary, + stats_details: stream_details, + stream_titles, + datasets, + alert_titles, + correlation_titles, + dashboard_titles, + filter_titles, + alerts_info, + }) +} + +// Helper functions to split the work + +async fn get_stream_titles(key: &SessionKey) -> Result, PrismHomeError> { let stream_titles: Vec = PARSEABLE .storage .get_object_store() @@ -104,8 +208,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result, PrismHomeError> { let alert_titles = ALERTS .list_alerts_for_user(key.clone()) .await? @@ -116,7 +222,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result, PrismHomeError> { let correlation_titles = CORRELATIONS .list_correlations(key) .await? @@ -127,7 +236,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result, PrismHomeError> { let dashboard_titles = DASHBOARDS .list_dashboards(key) .await @@ -143,7 +255,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result, PrismHomeError> { let filter_titles = FILTERS .list_filters(key) .await @@ -159,117 +274,106 @@ pub async fn generate_home_response(key: &SessionKey) -> Result d, - Err(e) => { - error!("Failed to parse stream metadata: {:?}", e); - continue; - } - }; - stream_jsons.push(stream_metadata); - } - stream_wise_stream_json.insert(stream.clone(), stream_jsons.clone()); - - let log_source = PARSEABLE - .get_stream(&stream) - .map_err(|e| PrismHomeError::Anyhow(e.into()))? - .get_log_source(); - - // if log_source_format is otel-metrics, set DataSetType to metrics - //if log_source_format is otel-traces, set DataSetType to traces - //else set DataSetType to logs - - let dataset_type = match log_source[0].log_source_format { - LogSource::OtelMetrics => DataSetType::Metrics, - LogSource::OtelTraces => DataSetType::Traces, - _ => DataSetType::Logs, - }; + Ok(filter_titles) +} - let dataset = DataSet { - title: stream.clone(), - dataset_type, +async fn get_stream_metadata( + stream: String, +) -> Result<(String, Vec, DataSetType), PrismHomeError> { + let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]); + let obs = PARSEABLE + .storage + .get_object_store() + .get_objects( + Some(&path), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await?; + + let mut stream_jsons = Vec::new(); + for ob in obs { + let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) { + Ok(d) => d, + Err(e) => { + error!("Failed to parse stream metadata: {:?}", e); + continue; + } }; - datasets.push(dataset); + stream_jsons.push(stream_metadata); } - for date in dates.into_iter() { - let dated_stats = stats_for_date(date, stream_wise_stream_json.clone()).await?; - summary.stats_summary.events += dated_stats.events; - summary.stats_summary.ingestion += dated_stats.ingestion_size; - summary.stats_summary.storage += dated_stats.storage_size; - - stream_details.push(dated_stats); + if stream_jsons.is_empty() { + return Err(PrismHomeError::Anyhow(anyhow::Error::msg( + "No stream metadata found", + ))); } - Ok(HomeResponse { - stream_info: summary, - stats_details: stream_details, - stream_titles, - datasets, - alert_titles, - correlation_titles, - dashboard_titles, - filter_titles, - alerts_info, - }) + // let log_source = &stream_jsons[0].clone().log_source; + let log_source_format = stream_jsons + .iter() + .find(|sj| !sj.log_source.is_empty()) + .map(|sj| sj.log_source[0].log_source_format.clone()) + .unwrap_or_default(); + + let dataset_type = match log_source_format { + LogSource::OtelMetrics => DataSetType::Metrics, + LogSource::OtelTraces => DataSetType::Traces, + _ => DataSetType::Logs, + }; + + Ok((stream, stream_jsons, dataset_type)) } async fn stats_for_date( date: String, stream_wise_meta: HashMap>, ) -> Result { - // collect stats for all the streams for the given date + // Initialize result structure let mut details = DatedStats { date: date.clone(), ..Default::default() }; - for (stream, meta) in stream_wise_meta { - let querier_stats = get_stats_date(&stream, &date).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; - // collect date-wise stats for all streams - details.events += querier_stats.events + ingestor_stats.events; - details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; - details.storage_size += querier_stats.storage + ingestor_stats.storage; + // Process each stream concurrently + let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| { + get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone()) + }); + + let stream_stats_results = futures::future::join_all(stream_stats_futures).await; + + // Aggregate results + for result in stream_stats_results { + match result { + Ok((events, ingestion, storage)) => { + details.events += events; + details.ingestion_size += ingestion; + details.storage_size += storage; + } + Err(e) => { + error!("Failed to get stats for stream: {:?}", e); + // Continue with other streams + } + } } Ok(details) } +async fn get_stream_stats_for_date( + stream: String, + date: String, + meta: Vec, +) -> Result<(u64, u64, u64), PrismHomeError> { + let querier_stats = get_stats_date(&stream, &date).await?; + let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; + + Ok(( + querier_stats.events + ingestor_stats.events, + querier_stats.ingestion + ingestor_stats.ingestion, + querier_stats.storage + ingestor_stats.storage, + )) +} + #[derive(Debug, thiserror::Error)] pub enum PrismHomeError { #[error("Error: {0}")] diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index b15e1907d..7f71dd6ab 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -60,14 +60,15 @@ pub struct PrismLogstreamInfo { pub async fn get_prism_logstream_info( stream_name: &str, ) -> Result { - // get StreamInfo - let info = get_stream_info_helper(stream_name).await?; - - // get stream schema - let schema = get_stream_schema_helper(stream_name).await?; - - // get stream stats - let stats = get_stats(stream_name).await?; + let (info, schema, stats) = tokio::join!( + get_stream_info_helper(stream_name), + get_stream_schema_helper(stream_name), + get_stats(stream_name), + ); + + let info = info?; + let schema = schema?; + let stats = stats?; // get retention let retention = PARSEABLE