From bb2b08d596114229d07e76986934231642bca7f6 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 13 Feb 2025 16:45:18 +0530 Subject: [PATCH 01/11] init commit for Prism GET /home --- src/alerts/mod.rs | 50 +++++++ src/handlers/http/home.rs | 38 +++++ src/handlers/http/mod.rs | 1 + src/handlers/http/modal/query_server.rs | 3 +- src/handlers/http/modal/server.rs | 7 +- src/home/mod.rs | 189 ++++++++++++++++++++++++ src/lib.rs | 1 + src/users/dashboards.rs | 2 +- 8 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 src/handlers/http/home.rs create mode 100644 src/home/mod.rs diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index eaa58d3e3..664cc75b6 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode; use http::StatusCode; use itertools::Itertools; use once_cell::sync::Lazy; +use serde::Serialize; use serde_json::Error as SerdeError; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; @@ -873,3 +874,52 @@ impl Alerts { Ok(()) } } + +#[derive(Debug, Serialize)] +pub struct AlertsInfo { + total: u64, + silenced: u64, + resolved: u64, + triggered: u64, + low: u64, + medium: u64, + high: u64 +} + +// TODO: add RBAC +pub async fn get_alerts_info() -> Result { + let alerts = ALERTS.alerts.read().await; + let mut total = 0; + let mut silenced = 0; + let mut resolved = 0; + let mut triggered = 0; + let mut low = 0; + let mut medium = 0; + let mut high = 0; + + for (_, alert) in alerts.iter() { + total += 1; + match alert.state { + AlertState::Silenced => silenced += 1, + AlertState::Resolved => resolved += 1, + AlertState::Triggered => triggered += 1, + } + + match alert.severity { + Severity::Low => low += 1, + Severity::Medium => medium += 1, + Severity::High => high += 1, + _ => {} + } + } + + Ok(AlertsInfo { + total, + silenced, + resolved, + triggered, + low, + medium, + high + }) +} \ No newline at end of file diff --git a/src/handlers/http/home.rs b/src/handlers/http/home.rs new file mode 100644 index 000000000..d61e885f9 --- /dev/null +++ b/src/handlers/http/home.rs @@ -0,0 +1,38 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, Responder}; + +use crate::{home::{generate_home_response, HomeError}, utils::actix::extract_session_key_from_req}; + + +/// Fetches the data to populate Prism's home +/// +/// +/// # Returns +/// +/// A JSONified version of the `HomeResponse` struct. +pub async fn home_api(req: HttpRequest) -> Result { + let key = extract_session_key_from_req(&req) + .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + + let res = generate_home_response(&key) + .await?; + + Ok(web::Json(res)) +} \ No newline at end of file diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f1f702d4b..48befd442 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -33,6 +33,7 @@ mod audit; pub mod cluster; pub mod correlation; pub mod health_check; +pub mod home; pub mod ingest; mod kinesis; pub mod llm; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 264f8b9e9..372c18155 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -69,7 +69,8 @@ impl ParseableServer for QueryServer { .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) - .service(Self::get_cluster_web_scope()), + .service(Self::get_cluster_web_scope()) + .service(Server::get_prism_home()), ) .service(Server::get_generated()); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 27a4d30f4..6fa84ec30 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -87,7 +87,8 @@ impl ParseableServer for Server { .service(Self::get_user_role_webscope()) .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) - .service(Self::get_metrics_webscope()), + .service(Self::get_metrics_webscope()) + .service(Self::get_prism_home()), ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); @@ -154,6 +155,10 @@ impl ParseableServer for Server { } impl Server { + pub fn get_prism_home() -> Resource { + web::resource("/home").route(web::get().to(http::home::home_api)) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), diff --git a/src/home/mod.rs b/src/home/mod.rs new file mode 100644 index 000000000..890a7181e --- /dev/null +++ b/src/home/mod.rs @@ -0,0 +1,189 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +use actix_web::http::header::ContentType; +use chrono::Local; +use http::StatusCode; +use itertools::Itertools; +use serde::Serialize; + +use crate::{alerts::{get_alerts_info, AlertsInfo, ALERTS}, correlation::CORRELATIONS, handlers::http::logstream::get_stats_date, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, stats::Stats, users::{dashboards::DASHBOARDS, filters::FILTERS}}; + +#[derive(Debug, Serialize, Default)] +struct StreamInfo { + // stream_count: u32, + // log_source_count: u32, + stats_summary: Stats, +} + +#[derive(Debug, Serialize, Default)] +struct DatedStats { + date: String, + events: u64, + ingestion_size: u64, + storage_size: u64, +} + +#[derive(Debug, Serialize)] +struct TitleAndId { + title: String, + id: String +} + +#[derive(Debug, Serialize)] +pub struct HomeResponse { + alert_titles: Vec, + alerts_info: AlertsInfo, + correlation_titles: Vec, + stream_info: StreamInfo, + stats_details: Vec, + stream_titles: Vec, + + + dashboard_titles: Vec, + filter_titles: Vec, + +} + +pub async fn generate_home_response(key: &SessionKey) -> Result { + + let user_id = if let Some(user_id) = Users.get_username_from_session(key) { + user_id + } else { + return Err(HomeError::Anyhow(anyhow::Error::msg("User does not exist"))); + }; + + // get all stream titles + let stream_titles = PARSEABLE.streams + .list() + .iter() + .filter(|logstream| { + Users.authorize(key.clone(), Action::ListStream, Some(&logstream), None) == crate::rbac::Response::Authorized + }) + .map(|logstream| logstream.clone()) + .collect_vec(); + + // get all alert titles (TODO: RBAC) + // do we need to move alerts into the PARSEABLE struct? + let alert_titles = ALERTS + .list_alerts_for_user(key.clone()) + .await + .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))? + .iter() + .map(|alert| TitleAndId { + title: alert.title.clone(), + id: alert.id.to_string() + }) + .collect_vec(); + + let correlation_titles = CORRELATIONS + .list_correlations(key) + .await + .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))? + .iter() + .map(|corr| TitleAndId { + title: corr.title.clone(), + id: corr.id.clone() + }) + .collect_vec(); + + let dashboard_titles = DASHBOARDS + .list_dashboards_by_user(&user_id) + .iter() + .map(|dashboard| TitleAndId { + title: dashboard.name.clone(), + id: dashboard.dashboard_id.as_ref().unwrap().clone() + }) + .collect_vec(); + + let filter_titles = FILTERS + .list_filters_by_user(&user_id) + .iter() + .map(|filter| { + TitleAndId { + title: filter.filter_name.clone(), + id: filter.filter_id.as_ref().unwrap().clone() + } + }) + .collect_vec(); + + let alerts_info = get_alerts_info().await + .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + + let dates = (0..7) + .map(|i| Local::now().checked_sub_signed(chrono::Duration::days(i)).unwrap()) + .map(|date| date.format("%Y-%m-%d").to_string()) + .collect_vec(); + + let mut stream_details = Vec::new(); + + let mut summary = StreamInfo::default(); + + for date in dates.iter() { + let mut details = DatedStats::default(); + details.date = date.clone(); + + for stream in stream_titles.iter() { + let stats = get_stats_date(stream, &date) + .await + .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + + details.events += stats.events; + details.ingestion_size += stats.ingestion; + details.storage_size += stats.storage; + + summary.stats_summary.events += stats.events; + summary.stats_summary.ingestion += stats.ingestion; + summary.stats_summary.storage += stats.storage; + } + + stream_details.push(details); + } + + Ok(HomeResponse { + stream_info: summary, + stats_details: stream_details, + stream_titles, + alert_titles, + correlation_titles, + dashboard_titles, + filter_titles, + alerts_info + }) +} + +#[derive(Debug, thiserror::Error)] +pub enum HomeError { + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), +} + +impl actix_web::ResponseError for HomeError { + fn status_code(&self) -> http::StatusCode { + match self { + HomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 91fa0a405..25103588c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ pub mod connectors; pub mod correlation; mod event; pub mod handlers; +pub mod home; pub mod hottier; mod livetail; mod metadata; diff --git a/src/users/dashboards.rs b/src/users/dashboards.rs index 797314129..39dd785d0 100644 --- a/src/users/dashboards.rs +++ b/src/users/dashboards.rs @@ -98,7 +98,7 @@ pub struct TickConfig { #[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct Dashboard { pub version: Option, - name: String, + pub name: String, description: String, pub dashboard_id: Option, pub user_id: Option, From 814986233d9873126f8e267d15611baeaa65c269 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 19 Feb 2025 11:42:00 +0530 Subject: [PATCH 02/11] handled different error types --- src/home/mod.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/home/mod.rs b/src/home/mod.rs index 890a7181e..ba624cb7f 100644 --- a/src/home/mod.rs +++ b/src/home/mod.rs @@ -23,7 +23,7 @@ use http::StatusCode; use itertools::Itertools; use serde::Serialize; -use crate::{alerts::{get_alerts_info, AlertsInfo, ALERTS}, correlation::CORRELATIONS, handlers::http::logstream::get_stats_date, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, stats::Stats, users::{dashboards::DASHBOARDS, filters::FILTERS}}; +use crate::{alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, handlers::http::logstream::{error::StreamError, get_stats_date}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, stats::Stats, users::{dashboards::DASHBOARDS, filters::FILTERS}}; #[derive(Debug, Serialize, Default)] struct StreamInfo { @@ -83,8 +83,7 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result Result Result http::StatusCode { match self { HomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + HomeError::AlertError(e) => e.status_code(), + HomeError::CorrelationError(e) => e.status_code(), + HomeError::StreamError(e) => e.status_code() } } From 89b5a6a239a8228b013085226e03c1d8f36ca469 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 19 Feb 2025 12:29:01 +0530 Subject: [PATCH 03/11] refactor + changes - refactored code for clippy - modified the base path for prism --- src/alerts/mod.rs | 6 +- src/handlers/http/home.rs | 11 ++-- src/handlers/http/mod.rs | 5 ++ src/handlers/http/modal/query_server.rs | 6 +- src/handlers/http/modal/server.rs | 5 +- src/home/mod.rs | 79 +++++++++++++++---------- src/metadata.rs | 4 +- src/parseable/mod.rs | 4 +- src/storage/mod.rs | 7 ++- 9 files changed, 77 insertions(+), 50 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 664cc75b6..66857d686 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -883,7 +883,7 @@ pub struct AlertsInfo { triggered: u64, low: u64, medium: u64, - high: u64 + high: u64, } // TODO: add RBAC @@ -920,6 +920,6 @@ pub async fn get_alerts_info() -> Result { triggered, low, medium, - high + high, }) -} \ No newline at end of file +} diff --git a/src/handlers/http/home.rs b/src/handlers/http/home.rs index d61e885f9..4ce343da5 100644 --- a/src/handlers/http/home.rs +++ b/src/handlers/http/home.rs @@ -18,8 +18,10 @@ use actix_web::{web, HttpRequest, Responder}; -use crate::{home::{generate_home_response, HomeError}, utils::actix::extract_session_key_from_req}; - +use crate::{ + home::{generate_home_response, HomeError}, + utils::actix::extract_session_key_from_req, +}; /// Fetches the data to populate Prism's home /// @@ -31,8 +33,7 @@ pub async fn home_api(req: HttpRequest) -> Result { let key = extract_session_key_from_req(&req) .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; - let res = generate_home_response(&key) - .await?; + let res = generate_home_response(&key).await?; Ok(web::Json(res)) -} \ No newline at end of file +} diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 48befd442..cf7d76fe9 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -48,11 +48,16 @@ pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; +pub const PRISM_BASE_PATH: &str = "prism"; pub fn base_path() -> String { format!("/{API_BASE_PATH}/{API_VERSION}") } +pub fn prism_base_path() -> String { + format!("/{API_BASE_PATH}/{PRISM_BASE_PATH}/{API_VERSION}") +} + pub fn metrics_path() -> String { format!("{}/metrics", base_path()) } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 372c18155..4ac0648a7 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -21,9 +21,9 @@ use std::thread; use crate::alerts::ALERTS; use crate::correlation::CORRELATIONS; use crate::handlers::airplane; -use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; +use crate::handlers::http::{base_path, prism_base_path}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; @@ -69,9 +69,9 @@ impl ParseableServer for QueryServer { .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) - .service(Self::get_cluster_web_scope()) - .service(Server::get_prism_home()), + .service(Self::get_cluster_web_scope()), ) + .service(web::scope(&prism_base_path()).service(Server::get_prism_home())) .service(Server::get_generated()); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 6fa84ec30..ee282177a 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -26,6 +26,7 @@ use crate::handlers::http::about; use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; +use crate::handlers::http::prism_base_path; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; @@ -87,9 +88,9 @@ impl ParseableServer for Server { .service(Self::get_user_role_webscope()) .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) - .service(Self::get_metrics_webscope()) - .service(Self::get_prism_home()), + .service(Self::get_metrics_webscope()), ) + .service(web::scope(&prism_base_path()).service(Self::get_prism_home())) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); } diff --git a/src/home/mod.rs b/src/home/mod.rs index ba624cb7f..5767ab93b 100644 --- a/src/home/mod.rs +++ b/src/home/mod.rs @@ -16,14 +16,21 @@ * */ - use actix_web::http::header::ContentType; -use chrono::Local; +use chrono::Utc; use http::StatusCode; use itertools::Itertools; use serde::Serialize; -use crate::{alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, handlers::http::logstream::{error::StreamError, get_stats_date}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, stats::Stats, users::{dashboards::DASHBOARDS, filters::FILTERS}}; +use crate::{ + alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, + correlation::{CorrelationError, CORRELATIONS}, + handlers::http::logstream::{error::StreamError, get_stats_date}, + parseable::PARSEABLE, + rbac::{map::SessionKey, role::Action, Users}, + stats::Stats, + users::{dashboards::DASHBOARDS, filters::FILTERS}, +}; #[derive(Debug, Serialize, Default)] struct StreamInfo { @@ -43,7 +50,7 @@ struct DatedStats { #[derive(Debug, Serialize)] struct TitleAndId { title: String, - id: String + id: String, } #[derive(Debug, Serialize)] @@ -54,15 +61,12 @@ pub struct HomeResponse { stream_info: StreamInfo, stats_details: Vec, stream_titles: Vec, - - + dashboard_titles: Vec, filter_titles: Vec, - } pub async fn generate_home_response(key: &SessionKey) -> Result { - let user_id = if let Some(user_id) = Users.get_username_from_session(key) { user_id } else { @@ -70,16 +74,17 @@ pub async fn generate_home_response(key: &SessionKey) -> Result = PARSEABLE + .streams .list() - .iter() + .into_iter() .filter(|logstream| { - Users.authorize(key.clone(), Action::ListStream, Some(&logstream), None) == crate::rbac::Response::Authorized + Users.authorize(key.clone(), Action::ListStream, Some(logstream), None) + == crate::rbac::Response::Authorized }) - .map(|logstream| logstream.clone()) - .collect_vec(); + .collect(); - // get all alert titles (TODO: RBAC) + // get all alert IDs (TODO: RBAC) // do we need to move alerts into the PARSEABLE struct? let alert_titles = ALERTS .list_alerts_for_user(key.clone()) @@ -87,44 +92,51 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result StatusCode::INTERNAL_SERVER_ERROR, HomeError::AlertError(e) => e.status_code(), HomeError::CorrelationError(e) => e.status_code(), - HomeError::StreamError(e) => e.status_code() + HomeError::StreamError(e) => e.status_code(), } } @@ -191,4 +206,4 @@ impl actix_web::ResponseError for HomeError { .insert_header(ContentType::plaintext()) .body(self.to_string()) } -} \ No newline at end of file +} diff --git a/src/metadata.rs b/src/metadata.rs index a29fdfee2..e712fe960 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::{Local, NaiveDateTime}; +use chrono::{NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::num::NonZeroU32; @@ -105,7 +105,7 @@ impl LogStreamMetadata { ) -> Self { LogStreamMetadata { created_at: if created_at.is_empty() { - Local::now().to_rfc3339() + Utc::now().to_rfc3339() } else { created_at }, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 10da8c1b0..4841a88fa 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -22,7 +22,7 @@ use std::{collections::HashMap, num::NonZeroU32, path::PathBuf, str::FromStr, sy use actix_web::http::header::HeaderMap; use arrow_schema::{Field, Schema}; use bytes::Bytes; -use chrono::Local; +use chrono::Utc; use clap::{error::ErrorKind, Parser}; use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; @@ -567,7 +567,7 @@ impl Parseable { let storage = self.storage.get_object_store(); let meta = ObjectStoreFormat { - created_at: Local::now().to_rfc3339(), + created_at: Utc::now().to_rfc3339(), permissions: vec![Permisssion::new(PARSEABLE.options.username.clone())], stream_type, time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3be5bfc37..460b7cfdc 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -31,6 +31,11 @@ use crate::{ utils::json::{deserialize_string_as_true, serialize_bool_as_true}, }; +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +use std::fmt::Debug; + mod azure_blob; mod localfs; mod metrics_layer; @@ -211,7 +216,7 @@ impl Default for ObjectStoreFormat { schema_version: SchemaVersion::V1, // Newly created streams should be v1 objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), stream_type: StreamType::UserDefined, - created_at: Local::now().to_rfc3339(), + created_at: Utc::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], From 9d4f25fa76cc0ac0eb5bef43867d544080b01a25 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 19 Feb 2025 15:12:50 +0530 Subject: [PATCH 04/11] added logstream/info API --- src/handlers/http/mod.rs | 3 +- src/handlers/http/modal/query_server.rs | 6 +- src/handlers/http/modal/server.rs | 10 +- src/handlers/http/{home.rs => prism_home.rs} | 6 +- src/handlers/http/prism_logstream.rs | 31 +++++ src/lib.rs | 2 +- src/logstream/mod.rs | 93 +++++++++++++ src/{ => prism}/home/mod.rs | 18 +-- src/prism/logstream/mod.rs | 138 +++++++++++++++++++ src/prism/mod.rs | 20 +++ src/storage/mod.rs | 2 - 11 files changed, 312 insertions(+), 17 deletions(-) rename src/handlers/http/{home.rs => prism_home.rs} (87%) create mode 100644 src/handlers/http/prism_logstream.rs create mode 100644 src/logstream/mod.rs rename src/{ => prism}/home/mod.rs (92%) create mode 100644 src/prism/logstream/mod.rs create mode 100644 src/prism/mod.rs diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index cf7d76fe9..aa2e0a02d 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -33,7 +33,6 @@ mod audit; pub mod cluster; pub mod correlation; pub mod health_check; -pub mod home; pub mod ingest; mod kinesis; pub mod llm; @@ -41,6 +40,8 @@ pub mod logstream; pub mod middleware; pub mod modal; pub mod oidc; +pub mod prism_home; +pub mod prism_logstream; pub mod query; pub mod rbac; pub mod role; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 4ac0648a7..c130f89c1 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -71,7 +71,11 @@ impl ParseableServer for QueryServer { .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), ) - .service(web::scope(&prism_base_path()).service(Server::get_prism_home())) + .service( + web::scope(&prism_base_path()) + .service(Server::get_prism_home()) + .service(Server::get_prism_logstream()), + ) .service(Server::get_generated()); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index ee282177a..98c092eca 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -157,7 +157,15 @@ impl ParseableServer for Server { impl Server { pub fn get_prism_home() -> Resource { - web::resource("/home").route(web::get().to(http::home::home_api)) + web::resource("/home").route(web::get().to(http::prism_home::home_api)) + } + + pub fn get_prism_logstream() -> Scope { + web::scope("/logstream").service( + web::scope("/{logstream}").service( + web::resource("/info").route(web::get().to(http::prism_logstream::get_info)), + ), + ) } pub fn get_metrics_webscope() -> Scope { diff --git a/src/handlers/http/home.rs b/src/handlers/http/prism_home.rs similarity index 87% rename from src/handlers/http/home.rs rename to src/handlers/http/prism_home.rs index 4ce343da5..a8a0328a6 100644 --- a/src/handlers/http/home.rs +++ b/src/handlers/http/prism_home.rs @@ -19,7 +19,7 @@ use actix_web::{web, HttpRequest, Responder}; use crate::{ - home::{generate_home_response, HomeError}, + prism::home::{generate_home_response, PrismHomeError}, utils::actix::extract_session_key_from_req, }; @@ -29,9 +29,9 @@ use crate::{ /// # Returns /// /// A JSONified version of the `HomeResponse` struct. -pub async fn home_api(req: HttpRequest) -> Result { +pub async fn home_api(req: HttpRequest) -> Result { let key = extract_session_key_from_req(&req) - .map_err(|err| HomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; + .map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?; let res = generate_home_response(&key).await?; diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs new file mode 100644 index 000000000..2be76a094 --- /dev/null +++ b/src/handlers/http/prism_logstream.rs @@ -0,0 +1,31 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{ + web::{self, Path}, + Responder, +}; + +use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError}; + +/// This API is essentially just combining the responses of /info and /schema together +pub async fn get_info(stream_name: Path) -> Result { + let prism_logstream_info = get_prism_logstream_info(&stream_name).await?; + + Ok(web::Json(prism_logstream_info)) +} diff --git a/src/lib.rs b/src/lib.rs index 25103588c..da7c6ad12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,6 @@ pub mod connectors; pub mod correlation; mod event; pub mod handlers; -pub mod home; pub mod hottier; mod livetail; mod metadata; @@ -38,6 +37,7 @@ mod oidc; pub mod option; pub mod otel; pub mod parseable; +pub mod prism; mod query; pub mod rbac; mod response; diff --git a/src/logstream/mod.rs b/src/logstream/mod.rs new file mode 100644 index 000000000..284804c35 --- /dev/null +++ b/src/logstream/mod.rs @@ -0,0 +1,93 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::Arc; + +use arrow_schema::Schema; +use http::StatusCode; + +use crate::{handlers::http::{logstream::error::StreamError, query::update_schema_when_distributed}, parseable::{StreamNotFound, PARSEABLE}, storage::StreamInfo, LOCK_EXPECT}; + + + +pub async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { + // Ensure parseable is aware of stream in distributed mode + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.to_owned()).into()); + } + + let stream = PARSEABLE.get_stream(&stream_name)?; + match update_schema_when_distributed(&vec![stream_name.to_owned()]).await { + Ok(_) => { + let schema = stream.get_schema(); + Ok(schema) + } + Err(err) => Err(StreamError::Custom { + msg: err.to_string(), + status: StatusCode::EXPECTATION_FAILED, + }), + } +} + +pub async fn get_stream_info_helper(stream_name: &str) -> Result { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.to_owned()).into()); + } + + let storage = PARSEABLE.storage.get_object_store(); + // if first_event_at is not found in memory map, check if it exists in the storage + // if it exists in the storage, update the first_event_at in memory map + let stream_first_event_at = + if let Some(first_event_at) = PARSEABLE.get_stream(&stream_name)?.get_first_event() { + Some(first_event_at) + } else if let Ok(Some(first_event_at)) = + storage.get_first_event_from_storage(&stream_name).await + { + PARSEABLE + .update_first_event_at(&stream_name, &first_event_at) + .await + } else { + None + }; + + let hash_map = PARSEABLE.streams.read().unwrap(); + let stream_meta = hash_map + .get(stream_name) + .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? + .metadata + .read() + .expect(LOCK_EXPECT); + + let stream_info = StreamInfo { + stream_type: stream_meta.stream_type, + created_at: stream_meta.created_at.clone(), + first_event_at: stream_first_event_at, + time_partition: stream_meta.time_partition.clone(), + time_partition_limit: stream_meta + .time_partition_limit + .map(|limit| limit.to_string()), + custom_partition: stream_meta.custom_partition.clone(), + static_schema_flag: stream_meta.static_schema_flag, + log_source: stream_meta.log_source.clone(), + }; + + Ok(stream_info) +} \ No newline at end of file diff --git a/src/home/mod.rs b/src/prism/home/mod.rs similarity index 92% rename from src/home/mod.rs rename to src/prism/home/mod.rs index 5767ab93b..35632d732 100644 --- a/src/home/mod.rs +++ b/src/prism/home/mod.rs @@ -66,11 +66,13 @@ pub struct HomeResponse { filter_titles: Vec, } -pub async fn generate_home_response(key: &SessionKey) -> Result { +pub async fn generate_home_response(key: &SessionKey) -> Result { let user_id = if let Some(user_id) = Users.get_username_from_session(key) { user_id } else { - return Err(HomeError::Anyhow(anyhow::Error::msg("User does not exist"))); + return Err(PrismHomeError::Anyhow(anyhow::Error::msg( + "User does not exist", + ))); }; // get all stream titles @@ -180,7 +182,7 @@ pub async fn generate_home_response(key: &SessionKey) -> Result http::StatusCode { match self { - HomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, - HomeError::AlertError(e) => e.status_code(), - HomeError::CorrelationError(e) => e.status_code(), - HomeError::StreamError(e) => e.status_code(), + PrismHomeError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismHomeError::AlertError(e) => e.status_code(), + PrismHomeError::CorrelationError(e) => e.status_code(), + PrismHomeError::StreamError(e) => e.status_code(), } } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs new file mode 100644 index 000000000..0373338c1 --- /dev/null +++ b/src/prism/logstream/mod.rs @@ -0,0 +1,138 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::Arc; + +use actix_web::http::header::ContentType; +use arrow_schema::Schema; +use http::StatusCode; +use serde::Serialize; + +use crate::{ + handlers::http::{logstream::error::StreamError, query::update_schema_when_distributed}, + parseable::{StreamNotFound, PARSEABLE}, + storage::StreamInfo, + LOCK_EXPECT, +}; + +#[derive(Serialize)] +pub struct PrismLogstreamInfo { + info: StreamInfo, + schema: Arc, +} + +pub async fn get_prism_logstream_info( + stream_name: &str, +) -> Result { + // get StreamInfo + let info = get_stream_info_helper(stream_name).await?; + // get stream schema + let schema = get_stream_schema_helper(stream_name).await?; + + Ok(PrismLogstreamInfo { info, schema }) +} + +async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { + // Ensure parseable is aware of stream in distributed mode + if PARSEABLE.check_or_load_stream(stream_name).await { + return Err(StreamNotFound(stream_name.to_owned()).into()); + } + + let stream = PARSEABLE.get_stream(stream_name)?; + match update_schema_when_distributed(&vec![stream_name.to_owned()]).await { + Ok(_) => { + let schema = stream.get_schema(); + Ok(schema) + } + Err(err) => Err(StreamError::Custom { + msg: err.to_string(), + status: StatusCode::EXPECTATION_FAILED, + }), + } +} + +async fn get_stream_info_helper(stream_name: &str) -> Result { + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if PARSEABLE.check_or_load_stream(stream_name).await { + return Err(StreamNotFound(stream_name.to_owned()).into()); + } + + let storage = PARSEABLE.storage.get_object_store(); + // if first_event_at is not found in memory map, check if it exists in the storage + // if it exists in the storage, update the first_event_at in memory map + let stream_first_event_at = if let Some(first_event_at) = + PARSEABLE.get_stream(stream_name)?.get_first_event() + { + Some(first_event_at) + } else if let Ok(Some(first_event_at)) = storage.get_first_event_from_storage(stream_name).await + { + PARSEABLE + .update_first_event_at(stream_name, &first_event_at) + .await + } else { + None + }; + + let hash_map = PARSEABLE.streams.read().unwrap(); + let stream_meta = hash_map + .get(stream_name) + .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? + .metadata + .read() + .expect(LOCK_EXPECT); + + let stream_info = StreamInfo { + stream_type: stream_meta.stream_type, + created_at: stream_meta.created_at.clone(), + first_event_at: stream_first_event_at, + time_partition: stream_meta.time_partition.clone(), + time_partition_limit: stream_meta + .time_partition_limit + .map(|limit| limit.to_string()), + custom_partition: stream_meta.custom_partition.clone(), + static_schema_flag: stream_meta.static_schema_flag, + log_source: stream_meta.log_source.clone(), + }; + + Ok(stream_info) +} + +#[derive(Debug, thiserror::Error)] +pub enum PrismLogstreamError { + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), + #[error("StreamError: {0}")] + StreamError(#[from] StreamError), +} + +impl actix_web::ResponseError for PrismLogstreamError { + fn status_code(&self) -> http::StatusCode { + match self { + PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::StreamError(e) => e.status_code(), + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/src/prism/mod.rs b/src/prism/mod.rs new file mode 100644 index 000000000..d661315b1 --- /dev/null +++ b/src/prism/mod.rs @@ -0,0 +1,20 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod home; +pub mod logstream; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 460b7cfdc..c97797d39 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -16,7 +16,6 @@ * */ -use chrono::Local; use object_store::path::Path; use relative_path::RelativePath; use serde::{Deserialize, Serialize}; @@ -32,7 +31,6 @@ use crate::{ }; use chrono::Utc; -use serde::{Deserialize, Serialize}; use std::fmt::Debug; From 25b18daa619f72cc799abc5acf0cd1e1eadbe72d Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 25 Feb 2025 15:47:59 +0530 Subject: [PATCH 05/11] bugfix: correct API path --- src/handlers/http/modal/server.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 98c092eca..d753294dd 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -90,7 +90,11 @@ impl ParseableServer for Server { .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), ) - .service(web::scope(&prism_base_path()).service(Self::get_prism_home())) + .service( + web::scope(&prism_base_path()) + .service(Server::get_prism_home()) + .service(Server::get_prism_logstream()), + ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); } From 8db824bbb8c0db0def5b41aa2965cf2481dd2d5b Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 25 Feb 2025 16:20:18 +0530 Subject: [PATCH 06/11] added rbac, modified listing of streams --- src/handlers/http/modal/server.rs | 6 +++++- src/prism/home/mod.rs | 18 ++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d753294dd..c381abae6 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -167,7 +167,11 @@ impl Server { pub fn get_prism_logstream() -> Scope { web::scope("/logstream").service( web::scope("/{logstream}").service( - web::resource("/info").route(web::get().to(http::prism_logstream::get_info)), + web::resource("/info").route( + web::get() + .to(http::prism_logstream::get_info) + .authorize_for_stream(Action::GetStreamInfo), + ), ), ) } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 35632d732..73106c6ea 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -77,14 +77,18 @@ pub async fn generate_home_response(key: &SessionKey) -> Result = PARSEABLE - .streams - .list() + .storage + .get_object_store() + .list_streams() + .await + .map_err(|e| PrismHomeError::Anyhow(anyhow::Error::new(e)))? .into_iter() .filter(|logstream| { Users.authorize(key.clone(), Action::ListStream, Some(logstream), None) == crate::rbac::Response::Authorized }) - .collect(); + .sorted() + .collect_vec(); // get all alert IDs (TODO: RBAC) // do we need to move alerts into the PARSEABLE struct? @@ -115,7 +119,9 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result Date: Thu, 27 Feb 2025 17:29:39 +0530 Subject: [PATCH 07/11] updates to prism endpoints /logstream/{logstream}/info - added stats and retention to the response body GET /users GET /users/{username} --- src/handlers/http/modal/query_server.rs | 1 + src/handlers/http/modal/server.rs | 26 ++++++- src/handlers/http/rbac.rs | 91 ++++++++++++++++++++++- src/prism/logstream/mod.rs | 99 ++++++++++++++++++++++++- src/rbac/mod.rs | 22 +++++- 5 files changed, 232 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c130f89c1..72758efa0 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -61,6 +61,7 @@ impl ParseableServer for QueryServer { .service(Server::get_about_factory()) .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) + .service(Server::get_users_webscope()) .service(Server::get_dashboards_webscope()) .service(Server::get_filters_webscope()) .service(Server::get_llm_webscope()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index c381abae6..a8a6b5cd2 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -81,6 +81,7 @@ impl ParseableServer for Server { .service(Self::get_about_factory()) .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) + .service(Self::get_users_webscope()) .service(Self::get_dashboards_webscope()) .service(Self::get_filters_webscope()) .service(Self::get_llm_webscope()) @@ -170,7 +171,9 @@ impl Server { web::resource("/info").route( web::get() .to(http::prism_logstream::get_info) - .authorize_for_stream(Action::GetStreamInfo), + .authorize_for_stream(Action::GetStreamInfo) + .authorize_for_stream(Action::GetStats) + .authorize_for_stream(Action::GetRetention), ), ), ) @@ -497,6 +500,27 @@ impl Server { ) } + // get the users webscope (for Prism only) + pub fn get_users_webscope() -> Scope { + web::scope("/users") + .service( + web::resource("") + // GET /users => List all users + .route( + web::get() + .to(http::rbac::list_users_prism) + .authorize(Action::ListUser), + ), + ) + .service( + web::resource("/{username}").route( + web::get() + .to(http::rbac::get_prism_user) + .authorize_for_user(Action::GetUserRoles), + ), + ) + } + // get the user webscope pub fn get_user_webscope() -> Scope { web::scope("/user") diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 38fc8f84e..b4b4d9d82 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -19,12 +19,17 @@ use std::collections::{HashMap, HashSet}; use crate::{ - rbac::{map::roles, role::model::DefaultPrivilege, user, Users}, + rbac::{self, map::roles, role::model::DefaultPrivilege, user, Users, UsersPrism}, storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; -use actix_web::{http::header::ContentType, web, Responder}; +use actix_web::{ + http::header::ContentType, + web::{self, Path}, + Responder, +}; use http::StatusCode; +use itertools::Itertools; use tokio::sync::Mutex; use super::modal::utils::rbac_utils::{get_metadata, put_metadata}; @@ -58,6 +63,88 @@ pub async fn list_users() -> impl Responder { web::Json(Users.collect_user::()) } +/// Handler for GET /api/v1/users +/// returns list of all registerd users along with their roles and other info +pub async fn list_users_prism() -> impl Responder { + // get all users + let prism_users = rbac::map::users() + .values() + .map(|u| { + let (id, method, email, picture) = match &u.ty { + user::UserType::Native(_) => (u.username(), "native", None, None), + user::UserType::OAuth(oauth) => ( + u.username(), + "oauth", + oauth.user_info.email.clone(), + oauth.user_info.picture.clone(), + ), + }; + let roles: HashMap> = Users + .get_role(id) + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + + UsersPrism { + id: id.into(), + method: method.into(), + email, + picture, + roles, + } + }) + .collect_vec(); + + web::Json(prism_users) +} + +/// Function for GET /users/{username} +pub async fn get_prism_user(username: Path) -> Result { + let username = username.into_inner(); + let prism_user = rbac::map::users() + .values() + .map(|u| { + let (id, method, email, picture) = match &u.ty { + user::UserType::Native(_) => (u.username(), "native", None, None), + user::UserType::OAuth(oauth) => ( + u.username(), + "oauth", + oauth.user_info.email.clone(), + oauth.user_info.picture.clone(), + ), + }; + let roles: HashMap> = Users + .get_role(id) + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + + UsersPrism { + id: id.into(), + method: method.into(), + email, + picture, + roles, + } + }) + .filter(|u| u.id.eq(&username)) + .collect_vec(); + + if prism_user.is_empty() { + Err(RBACError::UserDoesNotExist) + } else { + Ok(web::Json(prism_user[0].clone())) + } +} + // Handler for POST /api/v1/user/{username} // Creates a new user by username if it does not exists pub async fn post_user( diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 0373338c1..d9039f55f 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -20,13 +20,19 @@ use std::sync::Arc; use actix_web::http::header::ContentType; use arrow_schema::Schema; +use chrono::Utc; use http::StatusCode; use serde::Serialize; use crate::{ - handlers::http::{logstream::error::StreamError, query::update_schema_when_distributed}, + handlers::http::{ + cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + logstream::error::StreamError, + query::update_schema_when_distributed, + }, parseable::{StreamNotFound, PARSEABLE}, - storage::StreamInfo, + stats, + storage::{retention::Retention, StreamInfo}, LOCK_EXPECT, }; @@ -34,6 +40,8 @@ use crate::{ pub struct PrismLogstreamInfo { info: StreamInfo, schema: Arc, + stats: QueriedStats, + retention: Retention, } pub async fn get_prism_logstream_info( @@ -41,10 +49,25 @@ pub async fn get_prism_logstream_info( ) -> Result { // get StreamInfo let info = get_stream_info_helper(stream_name).await?; + // get stream schema let schema = get_stream_schema_helper(stream_name).await?; - Ok(PrismLogstreamInfo { info, schema }) + // get stream stats + let stats = get_stats(stream_name).await?; + + // get retention + let retention = PARSEABLE + .get_stream(stream_name)? + .get_retention() + .unwrap_or_default(); + + Ok(PrismLogstreamInfo { + info, + schema, + stats, + retention, + }) } async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { @@ -66,6 +89,73 @@ async fn get_stream_schema_helper(stream_name: &str) -> Result, Stre } } +async fn get_stats(stream_name: &str) -> Result { + let stats = stats::get_current_stats(stream_name, "json") + .ok_or_else(|| StreamNotFound(stream_name.to_owned()))?; + + let ingestor_stats: Option> = None; + + let hash_map = PARSEABLE.streams.read().expect("Readable"); + let stream_meta = &hash_map + .get(stream_name) + .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? + .metadata + .read() + .expect(LOCK_EXPECT); + + let time = Utc::now(); + + let stats = match &stream_meta.first_event_at { + Some(_) => { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(stream_name, time, ingestion_stats, storage_stats) + } + + None => { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(stream_name, time, ingestion_stats, storage_stats) + } + }; + let stats = if let Some(mut ingestor_stats) = ingestor_stats { + ingestor_stats.push(stats); + merge_quried_stats(ingestor_stats) + } else { + stats + }; + + Ok(stats) +} + async fn get_stream_info_helper(stream_name: &str) -> Result { // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -120,6 +210,8 @@ pub enum PrismLogstreamError { Anyhow(#[from] anyhow::Error), #[error("StreamError: {0}")] StreamError(#[from] StreamError), + #[error("StreamNotFound: {0}")] + StreamNotFound(#[from] StreamNotFound), } impl actix_web::ResponseError for PrismLogstreamError { @@ -127,6 +219,7 @@ impl actix_web::ResponseError for PrismLogstreamError { match self { PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::StreamError(e) => e.status_code(), + PrismLogstreamError::StreamNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 463c6cfe0..dc3473290 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -20,10 +20,13 @@ pub mod map; pub mod role; pub mod user; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use chrono::{DateTime, Days, Utc}; use itertools::Itertools; +use role::model::DefaultPrivilege; +use serde::Serialize; +use url::Url; use crate::rbac::map::{mut_sessions, mut_users, sessions, users}; use crate::rbac::role::Action; @@ -166,6 +169,23 @@ impl Users { } } +/// This struct represents a user along with their roles, email, etc +/// +/// TODO: rename this after deprecating the older struct +#[derive(Debug, Serialize, Clone)] +pub struct UsersPrism { + // username + pub id: String, + // oaith or native + pub method: String, + // email only if method is oauth + pub email: Option, + // picture only if oauth + pub picture: Option, + // roles for the user + pub roles: HashMap>, +} + fn roles_to_permission(roles: Vec) -> Vec { let mut perms = HashSet::new(); for role in &roles { From 4cb007344b244f73d14d546fdb4f761562ce724e Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 4 Mar 2025 11:59:10 +0530 Subject: [PATCH 08/11] refactor: handled suggestions --- src/handlers/http/logstream.rs | 67 +++++----------- .../http/modal/query/querier_logstream.rs | 66 +++++----------- src/handlers/http/rbac.rs | 77 +++---------------- src/prism/logstream/mod.rs | 69 +++++------------ src/rbac/mod.rs | 1 + src/rbac/utils.rs | 54 +++++++++++++ 6 files changed, 121 insertions(+), 213 deletions(-) create mode 100644 src/rbac/utils.rs diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b9fb64edc..82aaad0d3 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -259,57 +259,28 @@ pub async fn get_stats( let ingestor_stats: Option> = None; - let hash_map = PARSEABLE.streams.read().expect("Readable"); - let stream_meta = &hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT); - let time = Utc::now(); - let stats = match &stream_meta.first_event_at { - Some(_) => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } - - None => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } + let stats = { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) }; + let stats = if let Some(mut ingestor_stats) = ingestor_stats { ingestor_stats.push(stats); merge_quried_stats(ingestor_stats) diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 4c796d0a1..9ffeee3a7 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -45,7 +45,6 @@ use crate::{ parseable::{StreamNotFound, PARSEABLE}, stats::{self, Stats}, storage::StreamType, - LOCK_EXPECT, }; pub async fn delete(stream_name: Path) -> Result { @@ -176,57 +175,28 @@ pub async fn get_stats( None }; - let hash_map = PARSEABLE.streams.read().expect(LOCK_EXPECT); - let stream_meta = hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT); - let time = Utc::now(); - let stats = match &stream_meta.first_event_at { - Some(_) => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } + let stats = { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); - None => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } + QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) }; + let stats = if let Some(mut ingestor_stats) = ingestor_stats { ingestor_stats.push(stats); merge_quried_stats(ingestor_stats) diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index b4b4d9d82..3870c88a9 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use crate::{ - rbac::{self, map::roles, role::model::DefaultPrivilege, user, Users, UsersPrism}, + rbac::{self, map::roles, role::model::DefaultPrivilege, user, utils::to_prism_user, Users}, storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; @@ -67,37 +67,7 @@ pub async fn list_users() -> impl Responder { /// returns list of all registerd users along with their roles and other info pub async fn list_users_prism() -> impl Responder { // get all users - let prism_users = rbac::map::users() - .values() - .map(|u| { - let (id, method, email, picture) = match &u.ty { - user::UserType::Native(_) => (u.username(), "native", None, None), - user::UserType::OAuth(oauth) => ( - u.username(), - "oauth", - oauth.user_info.email.clone(), - oauth.user_info.picture.clone(), - ), - }; - let roles: HashMap> = Users - .get_role(id) - .iter() - .filter_map(|role_name| { - roles() - .get(role_name) - .map(|role| (role_name.to_owned(), role.clone())) - }) - .collect(); - - UsersPrism { - id: id.into(), - method: method.into(), - email, - picture, - roles, - } - }) - .collect_vec(); + let prism_users = rbac::map::users().values().map(to_prism_user).collect_vec(); web::Json(prism_users) } @@ -105,43 +75,14 @@ pub async fn list_users_prism() -> impl Responder { /// Function for GET /users/{username} pub async fn get_prism_user(username: Path) -> Result { let username = username.into_inner(); - let prism_user = rbac::map::users() - .values() - .map(|u| { - let (id, method, email, picture) = match &u.ty { - user::UserType::Native(_) => (u.username(), "native", None, None), - user::UserType::OAuth(oauth) => ( - u.username(), - "oauth", - oauth.user_info.email.clone(), - oauth.user_info.picture.clone(), - ), - }; - let roles: HashMap> = Users - .get_role(id) - .iter() - .filter_map(|role_name| { - roles() - .get(role_name) - .map(|role| (role_name.to_owned(), role.clone())) - }) - .collect(); - - UsersPrism { - id: id.into(), - method: method.into(), - email, - picture, - roles, - } - }) - .filter(|u| u.id.eq(&username)) - .collect_vec(); - - if prism_user.is_empty() { - Err(RBACError::UserDoesNotExist) + // First check if the user exists + let users = rbac::map::users(); + if let Some(user) = users.get(&username) { + // Create UsersPrism for the found user only + let prism_user = to_prism_user(user); + Ok(web::Json(prism_user)) } else { - Ok(web::Json(prism_user[0].clone())) + Err(RBACError::UserDoesNotExist) } } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index d9039f55f..ebdd463ec 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -95,57 +95,28 @@ async fn get_stats(stream_name: &str) -> Result> = None; - let hash_map = PARSEABLE.streams.read().expect("Readable"); - let stream_meta = &hash_map - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT); - let time = Utc::now(); - let stats = match &stream_meta.first_event_at { - Some(_) => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(stream_name, time, ingestion_stats, storage_stats) - } - - None => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(stream_name, time, ingestion_stats, storage_stats) - } + let stats = { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(stream_name, time, ingestion_stats, storage_stats) }; + let stats = if let Some(mut ingestor_stats) = ingestor_stats { ingestor_stats.push(stats); merge_quried_stats(ingestor_stats) @@ -219,7 +190,7 @@ impl actix_web::ResponseError for PrismLogstreamError { match self { PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::StreamError(e) => e.status_code(), - PrismLogstreamError::StreamNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, } } diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index dc3473290..28ead768b 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -19,6 +19,7 @@ pub mod map; pub mod role; pub mod user; +pub mod utils; use std::collections::{HashMap, HashSet}; diff --git a/src/rbac/utils.rs b/src/rbac/utils.rs new file mode 100644 index 000000000..6c04fa002 --- /dev/null +++ b/src/rbac/utils.rs @@ -0,0 +1,54 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ +use std::collections::HashMap; + +use super::{ + map::roles, + role::model::DefaultPrivilege, + user::{User, UserType}, + Users, UsersPrism, +}; + +pub fn to_prism_user(user: &User) -> UsersPrism { + let (id, method, email, picture) = match &user.ty { + UserType::Native(_) => (user.username(), "native", None, None), + UserType::OAuth(oauth) => ( + user.username(), + "oauth", + oauth.user_info.email.clone(), + oauth.user_info.picture.clone(), + ), + }; + let roles: HashMap> = Users + .get_role(id) + .iter() + .filter_map(|role_name| { + roles() + .get(role_name) + .map(|role| (role_name.to_owned(), role.clone())) + }) + .collect(); + + UsersPrism { + id: id.into(), + method: method.into(), + email, + picture, + roles, + } +} From c683b969143e4260c07933a5400bfc14446a24bb Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 4 Mar 2025 14:26:00 +0530 Subject: [PATCH 09/11] added GET /roles endpoint --- src/handlers/http/modal/query_server.rs | 1 + src/handlers/http/modal/server.rs | 8 ++++++++ src/handlers/http/role.rs | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 72758efa0..c51d98bd4 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -67,6 +67,7 @@ impl ParseableServer for QueryServer { .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) + .service(Server::get_roles_webscope()) .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index a8a6b5cd2..38d0e0239 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -87,6 +87,7 @@ impl ParseableServer for Server { .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) + .service(Self::get_roles_webscope()) .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), @@ -480,6 +481,13 @@ impl Server { } } + // get list of roles + pub fn get_roles_webscope() -> Scope { + web::scope("/roles").service( + web::resource("").route(web::get().to(role::list_roles).authorize(Action::ListRole)), + ) + } + // get the role webscope pub fn get_user_role_webscope() -> Scope { web::scope("/role") diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index f8f2c1c4f..c649ac248 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -65,6 +65,14 @@ pub async fn list() -> Result { Ok(web::Json(roles)) } +// Handler for GET /api/v1/roles +// Fetch all roles in the system +pub async fn list_roles() -> Result { + let metadata = get_metadata().await?; + let roles = metadata.roles.clone(); + Ok(web::Json(roles)) +} + // Handler for DELETE /api/v1/role/{username} // Delete existing role pub async fn delete(name: web::Path) -> Result { From 3cf619793142a87a5d032912693300f9454c6912 Mon Sep 17 00:00:00 2001 From: parmesant Date: Tue, 4 Mar 2025 16:30:49 +0530 Subject: [PATCH 10/11] Update src/logstream/mod.rs Co-authored-by: Devdutt Shenoi Signed-off-by: parmesant --- src/logstream/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/logstream/mod.rs b/src/logstream/mod.rs index 284804c35..9dc29e06b 100644 --- a/src/logstream/mod.rs +++ b/src/logstream/mod.rs @@ -68,10 +68,7 @@ pub async fn get_stream_info_helper(stream_name: &str) -> Result Date: Tue, 4 Mar 2025 16:32:16 +0530 Subject: [PATCH 11/11] refactor Co-authored-by: Devdutt Shenoi Signed-off-by: parmesant --- src/handlers/http/logstream.rs | 12 ++++++------ src/handlers/http/modal/query/querier_logstream.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 90c49c893..107c706ed 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -262,17 +262,17 @@ pub async fn get_stats( let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.current_stats.ingestion), stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.lifetime_stats.ingestion), stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.deleted_stats.ingestion), "json", ); let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), + format!("{} Bytes", stats.current_stats.storage), + format!("{} Bytes", stats.lifetime_stats.storage), + format!("{} Bytes", stats.deleted_stats.storage), "parquet", ); diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 9ffeee3a7..740855bb7 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -180,17 +180,17 @@ pub async fn get_stats( let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.current_stats.ingestion), stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.lifetime_stats.ingestion), stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + format!("{} Bytes", stats.deleted_stats.ingestion), "json", ); let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), + format!("{} Bytes", stats.current_stats.storage), + format!("{} Bytes", stats.lifetime_stats.storage), + format!("{} Bytes", stats.deleted_stats.storage), "parquet", );