Skip to content

Feat: Prism APIs #1188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 4, 2025
50 changes: 50 additions & 0 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
use http::StatusCode;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::Error as SerdeError;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
Expand Down Expand Up @@ -873,3 +874,52 @@ impl Alerts {
Ok(())
}
}

#[derive(Debug, Serialize)]
pub struct AlertsInfo {
total: u64,
silenced: u64,
resolved: u64,
triggered: u64,
low: u64,
medium: u64,
high: u64,
}

// TODO: add RBAC
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut total = 0;
let mut silenced = 0;
let mut resolved = 0;
let mut triggered = 0;
let mut low = 0;
let mut medium = 0;
let mut high = 0;

for (_, alert) in alerts.iter() {
total += 1;
match alert.state {
AlertState::Silenced => silenced += 1,
AlertState::Resolved => resolved += 1,
AlertState::Triggered => triggered += 1,
}

match alert.severity {
Severity::Low => low += 1,
Severity::Medium => medium += 1,
Severity::High => high += 1,
_ => {}
}
}

Ok(AlertsInfo {
total,
silenced,
resolved,
triggered,
low,
medium,
high,
})
}
Comment on lines +889 to +925
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

❓ Verification inconclusive

Add RBAC implementation as noted in TODO comment.

The function needs Role-Based Access Control implementation for security.

Let's verify the RBAC implementation in other similar functions:

Additionally:

  1. The function should handle the Critical severity case explicitly instead of silently ignoring it.
  2. Consider using a more efficient counting method.

Apply this diff to improve the implementation:

-// TODO: add RBAC
-pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
+pub async fn get_alerts_info(session: SessionKey) -> Result<AlertsInfo, AlertError> {
     let alerts = ALERTS.alerts.read().await;
-    let mut total = 0;
-    let mut silenced = 0;
-    let mut resolved = 0;
-    let mut triggered = 0;
-    let mut low = 0;
-    let mut medium = 0;
-    let mut high = 0;
+    let mut info = AlertsInfo {
+        total: 0,
+        silenced: 0,
+        resolved: 0,
+        triggered: 0,
+        low: 0,
+        medium: 0,
+        high: 0,
+        critical: 0,
+    };

     for (_, alert) in alerts.iter() {
-        total += 1;
+        // Check if user has permission to view this alert
+        if user_auth_for_query(&session, &alert.query).await.is_err() {
+            continue;
+        }
+
+        info.total += 1;
         match alert.state {
-            AlertState::Silenced => silenced += 1,
-            AlertState::Resolved => resolved += 1,
-            AlertState::Triggered => triggered += 1,
+            AlertState::Silenced => info.silenced += 1,
+            AlertState::Resolved => info.resolved += 1,
+            AlertState::Triggered => info.triggered += 1,
         }

         match alert.severity {
-            Severity::Low => low += 1,
-            Severity::Medium => medium += 1,
-            Severity::High => high += 1,
-            _ => {}
+            Severity::Low => info.low += 1,
+            Severity::Medium => info.medium += 1,
+            Severity::High => info.high += 1,
+            Severity::Critical => info.critical += 1,
         }
     }

-    Ok(AlertsInfo {
-        total,
-        silenced,
-        resolved,
-        triggered,
-        low,
-        medium,
-        high,
-    })
+    Ok(info)
 }

🏁 Script executed:

#!/bin/bash
# Search for RBAC patterns in alert-related functions
rg -A 5 "pub async fn.*alert.*session.*SessionKey" src/

Length of output: 57


Action Required: Verify RBAC Integration and Critical Severity Handling

The suggested changes introduce an RBAC check via a new SessionKey parameter and update the alerts counting logic to account for a new critical field in the AlertsInfo structure. However, the initial search for RBAC patterns in similar functions did not produce any output. Please manually verify that the RBAC approach (i.e. calling user_auth_for_query to filter alerts based on permissions) is consistent with the rest of the codebase and that adding a critical counter aligns with the existing definition of AlertsInfo.

  • Confirm that the new parameter session: SessionKey is appropriate for accessing and validating alerts.
  • Verify that the function user_auth_for_query exists and is used elsewhere for RBAC enforcement.
  • Check that the AlertsInfo struct definition is updated to include a critical field.
  • Ensure that the refactoring to use a single mutable structure for accumulating counts does not affect performance or introduce unwanted side effects.

Please ensure these modifications are in line with your security model and that the alerts API is thoroughly tested before merging.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: add RBAC
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut total = 0;
let mut silenced = 0;
let mut resolved = 0;
let mut triggered = 0;
let mut low = 0;
let mut medium = 0;
let mut high = 0;
for (_, alert) in alerts.iter() {
total += 1;
match alert.state {
AlertState::Silenced => silenced += 1,
AlertState::Resolved => resolved += 1,
AlertState::Triggered => triggered += 1,
}
match alert.severity {
Severity::Low => low += 1,
Severity::Medium => medium += 1,
Severity::High => high += 1,
_ => {}
}
}
Ok(AlertsInfo {
total,
silenced,
resolved,
triggered,
low,
medium,
high,
})
}
pub async fn get_alerts_info(session: SessionKey) -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut info = AlertsInfo {
total: 0,
silenced: 0,
resolved: 0,
triggered: 0,
low: 0,
medium: 0,
high: 0,
critical: 0,
};
for (_, alert) in alerts.iter() {
// Check if user has permission to view this alert
if user_auth_for_query(&session, &alert.query).await.is_err() {
continue;
}
info.total += 1;
match alert.state {
AlertState::Silenced => info.silenced += 1,
AlertState::Resolved => info.resolved += 1,
AlertState::Triggered => info.triggered += 1,
}
match alert.severity {
Severity::Low => info.low += 1,
Severity::Medium => info.medium += 1,
Severity::High => info.high += 1,
Severity::Critical => info.critical += 1,
}
}
Ok(info)
}

76 changes: 19 additions & 57 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use self::error::StreamError;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;
use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
Expand Down Expand Up @@ -257,64 +257,26 @@ pub async fn get_stats(
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;

let ingestor_stats: Option<Vec<QueriedStats>> = None;

let hash_map = PARSEABLE.streams.read().expect("Readable");
let stream_meta = &hash_map
.get(&stream_name)
.ok_or_else(|| StreamNotFound(stream_name.clone()))?
.metadata
.read()
.expect(LOCK_EXPECT);

let time = Utc::now();

let stats = match &stream_meta.first_event_at {
Some(_) => {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
stats.lifetime_stats.events,
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
stats.deleted_stats.events,
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
"json",
);
let storage_stats = StorageStats::new(
format!("{} {}", stats.current_stats.storage, "Bytes"),
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
"parquet",
);

QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
}

None => {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
stats.lifetime_stats.events,
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
stats.deleted_stats.events,
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
"json",
);
let storage_stats = StorageStats::new(
format!("{} {}", stats.current_stats.storage, "Bytes"),
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
"parquet",
);

QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
}
};
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
ingestor_stats.push(stats);
merge_quried_stats(ingestor_stats)
} else {
stats
let stats = {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} Bytes", stats.current_stats.ingestion),
stats.lifetime_stats.events,
format!("{} Bytes", stats.lifetime_stats.ingestion),
stats.deleted_stats.events,
format!("{} Bytes", stats.deleted_stats.ingestion),
"json",
);
let storage_stats = StorageStats::new(
format!("{} Bytes", stats.current_stats.storage),
format!("{} Bytes", stats.lifetime_stats.storage),
format!("{} Bytes", stats.deleted_stats.storage),
"parquet",
);

QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
};

let stats = serde_json::to_value(stats)?;
Expand Down
7 changes: 7 additions & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,25 @@ pub mod logstream;
pub mod middleware;
pub mod modal;
pub mod oidc;
pub mod prism_home;
pub mod prism_logstream;
pub mod query;
pub mod rbac;
pub mod role;
pub mod users;
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "api";
pub const API_VERSION: &str = "v1";
pub const PRISM_BASE_PATH: &str = "prism";

pub fn base_path() -> String {
format!("/{API_BASE_PATH}/{API_VERSION}")
}

pub fn prism_base_path() -> String {
format!("/{API_BASE_PATH}/{PRISM_BASE_PATH}/{API_VERSION}")
}

pub fn metrics_path() -> String {
format!("{}/metrics", base_path())
}
Expand Down
66 changes: 18 additions & 48 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::{
parseable::{StreamNotFound, PARSEABLE},
stats::{self, Stats},
storage::StreamType,
LOCK_EXPECT,
};

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -176,57 +175,28 @@ pub async fn get_stats(
None
};

let hash_map = PARSEABLE.streams.read().expect(LOCK_EXPECT);
let stream_meta = hash_map
.get(&stream_name)
.ok_or_else(|| StreamNotFound(stream_name.clone()))?
.metadata
.read()
.expect(LOCK_EXPECT);

let time = Utc::now();

let stats = match &stream_meta.first_event_at {
Some(_) => {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
stats.lifetime_stats.events,
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
stats.deleted_stats.events,
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
"json",
);
let storage_stats = StorageStats::new(
format!("{} {}", stats.current_stats.storage, "Bytes"),
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
"parquet",
);

QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
}
let stats = {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} Bytes", stats.current_stats.ingestion),
stats.lifetime_stats.events,
format!("{} Bytes", stats.lifetime_stats.ingestion),
stats.deleted_stats.events,
format!("{} Bytes", stats.deleted_stats.ingestion),
"json",
);
let storage_stats = StorageStats::new(
format!("{} Bytes", stats.current_stats.storage),
format!("{} Bytes", stats.lifetime_stats.storage),
format!("{} Bytes", stats.deleted_stats.storage),
"parquet",
);

None => {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
stats.lifetime_stats.events,
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
stats.deleted_stats.events,
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
"json",
);
let storage_stats = StorageStats::new(
format!("{} {}", stats.current_stats.storage, "Bytes"),
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
"parquet",
);

QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
}
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
};

let stats = if let Some(mut ingestor_stats) = ingestor_stats {
ingestor_stats.push(stats);
merge_quried_stats(ingestor_stats)
Expand Down
9 changes: 8 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::thread;
use crate::alerts::ALERTS;
use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
use crate::handlers::http::base_path;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
use crate::handlers::http::{base_path, prism_base_path};
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::handlers::http::{rbac, role};
use crate::hottier::HotTierManager;
Expand Down Expand Up @@ -61,16 +61,23 @@ impl ParseableServer for QueryServer {
.service(Server::get_about_factory())
.service(Self::get_logstream_webscope())
.service(Self::get_user_webscope())
.service(Server::get_users_webscope())
.service(Server::get_dashboards_webscope())
.service(Server::get_filters_webscope())
.service(Server::get_llm_webscope())
.service(Server::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Server::get_roles_webscope())
.service(Server::get_counts_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_alerts_webscope())
.service(Self::get_cluster_web_scope()),
)
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
)
Comment on lines +76 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add authorization middleware to Prism endpoints.

The new Prism endpoints are missing authorization middleware, which could lead to unauthorized access to sensitive data. Consider adding appropriate authorization checks using the .authorize() middleware, similar to how it's used in other endpoints.

Apply this diff to add authorization:

 .service(
     web::scope(&prism_base_path())
-        .service(Server::get_prism_home())
-        .service(Server::get_prism_logstream()),
+        .service(Server::get_prism_home().authorize(Action::ListStream))
+        .service(Server::get_prism_logstream().authorize_for_stream(Action::GetStreamInfo)),
 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
)
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home().authorize(Action::ListStream))
.service(Server::get_prism_logstream().authorize_for_stream(Action::GetStreamInfo)),
)

.service(Server::get_generated());
}

Expand Down
54 changes: 54 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::handlers::http::about;
use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::health_check;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::query;
use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
Expand Down Expand Up @@ -80,15 +81,22 @@ impl ParseableServer for Server {
.service(Self::get_about_factory())
.service(Self::get_logstream_webscope())
.service(Self::get_user_webscope())
.service(Self::get_users_webscope())
.service(Self::get_dashboards_webscope())
.service(Self::get_filters_webscope())
.service(Self::get_llm_webscope())
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope())
.service(Self::get_roles_webscope())
.service(Self::get_counts_webscope())
.service(Self::get_alerts_webscope())
.service(Self::get_metrics_webscope()),
)
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
}
Expand Down Expand Up @@ -154,6 +162,24 @@ impl ParseableServer for Server {
}

impl Server {
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api))
}
Comment on lines +165 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add authorization middleware to the Prism home endpoint.

The implementation follows the routing pattern, but lacks authorization middleware that is present in other similar endpoints (like alerts, metrics, etc.). This is needed to maintain consistent security across all API endpoints.

Apply this diff to add authorization:

pub fn get_prism_home() -> Resource {
-    web::resource("/home").route(web::get().to(http::prism_home::home_api))
+    web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api))
}
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}


pub fn get_prism_logstream() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}").service(
web::resource("/info").route(
web::get()
.to(http::prism_logstream::get_info)
.authorize_for_stream(Action::GetStreamInfo)
.authorize_for_stream(Action::GetStats)
.authorize_for_stream(Action::GetRetention),
),
),
)
}

pub fn get_metrics_webscope() -> Scope {
web::scope("/metrics").service(
web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),
Expand Down Expand Up @@ -455,6 +481,13 @@ impl Server {
}
}

// get list of roles
pub fn get_roles_webscope() -> Scope {
web::scope("/roles").service(
web::resource("").route(web::get().to(role::list_roles).authorize(Action::ListRole)),
)
}

// get the role webscope
pub fn get_user_role_webscope() -> Scope {
web::scope("/role")
Expand All @@ -475,6 +508,27 @@ impl Server {
)
}

// get the users webscope (for Prism only)
pub fn get_users_webscope() -> Scope {
web::scope("/users")
.service(
web::resource("")
// GET /users => List all users
.route(
web::get()
.to(http::rbac::list_users_prism)
.authorize(Action::ListUser),
),
)
.service(
web::resource("/{username}").route(
web::get()
.to(http::rbac::get_prism_user)
.authorize_for_user(Action::GetUserRoles),
),
)
}

// get the user webscope
pub fn get_user_webscope() -> Scope {
web::scope("/user")
Expand Down
Loading
Loading