diff --git a/src/correlation.rs b/src/correlation.rs index cf2c7bd82..7ffde6a8a 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -16,7 +16,7 @@ * */ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use actix_web::{http::header::ContentType, Error}; use chrono::Utc; @@ -24,13 +24,17 @@ use datafusion::error::DataFusionError; use http::StatusCode; use itertools::Itertools; use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; use tokio::sync::RwLock; -use tracing::{error, trace, warn}; +use tracing::error; use crate::{ - handlers::http::rbac::RBACError, + handlers::http::{ + rbac::RBACError, + users::{CORRELATION_DIR, USERS_ROOT_DIR}, + }, option::CONFIG, query::QUERY_SESSION, rbac::{map::SessionKey, Users}, @@ -39,118 +43,184 @@ use crate::{ utils::{get_hash, user_auth_for_query}, }; -pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); +pub static CORRELATIONS: Lazy = Lazy::new(Correlations::default); -#[derive(Debug, Default)] -pub struct Correlation(RwLock>); +type CorrelationMap = HashMap; -impl Correlation { - //load correlations from storage +#[derive(Debug, Default, derive_more::Deref)] +pub struct Correlations(RwLock); + +impl Correlations { + // Load correlations from storage pub async fn load(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); let all_correlations = store.get_all_correlations().await.unwrap_or_default(); - let correlations: Vec = all_correlations - .into_iter() - .flat_map(|(_, correlations_bytes)| correlations_bytes) - .filter_map(|correlation| { - serde_json::from_slice(&correlation) - .inspect_err(|e| { - error!("Unable to load correlation: {e}"); - }) - .ok() - }) - .collect(); + for correlations_bytes in all_correlations.values().flatten() { + let correlation = match serde_json::from_slice::(correlations_bytes) + { + Ok(c) => c, + Err(e) => { + error!("Unable to load correlation file : {e}"); + continue; + } + }; + + self.write() + .await + .insert(correlation.id.to_owned(), correlation); + } - let mut s = self.0.write().await; - s.extend(correlations); Ok(()) } - pub async fn list_correlations_for_user( + pub async fn list_correlations( &self, session_key: &SessionKey, - user_id: &str, ) -> Result, CorrelationError> { - let correlations = self.0.read().await.iter().cloned().collect_vec(); - let mut user_correlations = vec![]; let permissions = Users.get_permissions(session_key); - for c in correlations { - let tables = &c + for correlation in self.read().await.values() { + let tables = &correlation .table_configs .iter() .map(|t| t.table_name.clone()) .collect_vec(); - if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id { - user_correlations.push(c); + if user_auth_for_query(&permissions, tables).is_ok() { + user_correlations.push(correlation.clone()); } } + Ok(user_correlations) } pub async fn get_correlation( &self, correlation_id: &str, - user_id: &str, ) -> Result { - let read = self.0.read().await; - let correlation = read - .iter() - .find(|c| c.id == correlation_id && c.user_id == user_id) - .cloned(); - - correlation.ok_or_else(|| { - CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlation with ID- {correlation_id}" - ))) - }) + self.read() + .await + .get(correlation_id) + .cloned() + .ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlation with ID- {correlation_id}" + ))) + }) } - pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { - // save to memory - let mut s = self.0.write().await; - s.retain(|c| c.id != correlation.id); - s.push(correlation.clone()); - Ok(()) + /// Create correlation associated with the user + pub async fn create( + &self, + mut correlation: CorrelationConfig, + session_key: &SessionKey, + ) -> Result { + correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + correlation.validate(session_key).await?; + + // Update in storage + let correlation_bytes = serde_json::to_vec(&correlation)?.into(); + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .put_object(&path, correlation_bytes) + .await?; + + // Update in memory + self.write().await.insert( + correlation.id.to_owned(), + correlation.clone(), + ); + + Ok(correlation) } - pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { - // now delete from memory - let read_access = self.0.read().await; + /// Update existing correlation for the user and with the same ID + pub async fn update( + &self, + mut updated_correlation: CorrelationConfig, + session_key: &SessionKey, + ) -> Result { + // validate whether user has access to this correlation object or not + let correlation = self.get_correlation(&updated_correlation.id).await?; + if correlation.user_id != updated_correlation.user_id { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + r#"User "{}" isn't authorized to update correlation with ID - {}"#, + updated_correlation.user_id, correlation.id + )))); + } - let index = read_access - .iter() - .enumerate() - .find(|(_, c)| c.id == correlation_id) - .to_owned(); - - if let Some((index, _)) = index { - // drop the read access in order to get exclusive write access - drop(read_access); - self.0.write().await.remove(index); - trace!("removed correlation from memory"); - } else { - warn!("Correlation ID- {correlation_id} not found in memory!"); + correlation.validate(session_key).await?; + updated_correlation.update(correlation); + + // Update in storage + let correlation_bytes = serde_json::to_vec(&updated_correlation)?.into(); + let path = updated_correlation.path(); + CONFIG + .storage() + .get_object_store() + .put_object(&path, correlation_bytes) + .await?; + + // Update in memory + self.write().await.insert( + updated_correlation.id.to_owned(), + updated_correlation.clone(), + ); + + Ok(updated_correlation) + } + + /// Delete correlation from memory and storage + pub async fn delete( + &self, + correlation_id: &str, + user_id: &str, + ) -> Result<(), CorrelationError> { + let correlation = CORRELATIONS.get_correlation(correlation_id).await?; + if correlation.user_id != user_id { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"# + )))); } + + // Delete from memory + self.write().await.remove(&correlation.id); + + // Delete from storage + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .delete_object(&path) + .await?; + Ok(()) } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum CorrelationVersion { + #[default] V1, } +type CorrelationId = String; +type UserId = String; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { + #[serde(default)] pub version: CorrelationVersion, pub title: String, - pub id: String, - pub user_id: String, + #[serde(default)] + pub id: CorrelationId, + #[serde(default)] + pub user_id: UserId, pub table_configs: Vec, pub join_config: JoinConfig, pub filter: Option, @@ -158,48 +228,23 @@ pub struct CorrelationConfig { pub end_time: Option, } -impl CorrelationConfig {} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CorrelationRequest { - pub title: String, - pub table_configs: Vec, - pub join_config: JoinConfig, - pub filter: Option, - pub start_time: Option, - pub end_time: Option, -} - -impl From for CorrelationConfig { - fn from(val: CorrelationRequest) -> Self { - Self { - version: CorrelationVersion::V1, - title: val.title, - id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), - user_id: String::default(), - table_configs: val.table_configs, - join_config: val.join_config, - filter: val.filter, - start_time: val.start_time, - end_time: val.end_time, - } +impl CorrelationConfig { + pub fn path(&self) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + &self.user_id, + CORRELATION_DIR, + &format!("{}.json", self.id), + ]) } -} -impl CorrelationRequest { - pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig { - CorrelationConfig { - version: CorrelationVersion::V1, - title: self.title, - id, - user_id, - table_configs: self.table_configs, - join_config: self.join_config, - filter: self.filter, - start_time: self.start_time, - end_time: self.end_time, - } + pub fn update(&mut self, update: Self) { + self.title = update.title; + self.table_configs = update.table_configs; + self.join_config = update.join_config; + self.filter = update.filter; + self.start_time = update.start_time; + self.end_time = update.end_time; } /// This function will validate the TableConfigs, JoinConfig, and user auth diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 08a9b13d2..47423f574 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -16,49 +16,35 @@ * */ +use actix_web::web::{Json, Path}; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use anyhow::Error; -use bytes::Bytes; use itertools::Itertools; use crate::rbac::Users; -use crate::storage::object_storage::correlation_path; +use crate::utils::actix::extract_session_key_from_req; use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; -use crate::{option::CONFIG, utils::actix::extract_session_key_from_req}; -use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS}; +use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - let correlations = CORRELATIONS - .list_correlations_for_user(&session_key, &user_id) - .await?; + let correlations = CORRELATIONS.list_correlations(&session_key).await?; Ok(web::Json(correlations)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + correlation_id: Path, +) -> Result { + let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) - .await?; + let correlation = CORRELATIONS.get_correlation(&correlation_id).await?; let permissions = Users.get_permissions(&session_key); @@ -73,113 +59,50 @@ pub async fn get(req: HttpRequest) -> Result { Ok(web::Json(correlation)) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(mut correlation): Json, +) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; + correlation.user_id = user_id; - let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; - - correlation_request.validate(&session_key).await?; - - let mut correlation: CorrelationConfig = correlation_request.into(); - correlation.user_id.clone_from(&user_id); - let correlation_id = &correlation.id; - let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - - let store = CONFIG.storage().get_object_store(); - let correlation_bytes = serde_json::to_vec(&correlation)?; - store - .put_object(&path, Bytes::from(correlation_bytes)) - .await?; - - // Save to memory - CORRELATIONS.update(&correlation).await?; + let correlation = CORRELATIONS.create(correlation, &session_key).await?; Ok(web::Json(correlation)) } -pub async fn modify(req: HttpRequest, body: Bytes) -> Result { - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) +pub async fn modify( + req: HttpRequest, + correlation_id: Path, + Json(mut correlation): Json, +) -> Result { + correlation.id = correlation_id.into_inner(); + correlation.user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - - // validate whether user has access to this correlation object or not - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) - .await?; - let permissions = Users.get_permissions(&session_key); - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - - user_auth_for_query(&permissions, tables)?; - - let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; - correlation_request.validate(&session_key).await?; - - let correlation = - correlation_request.generate_correlation_config(correlation_id.to_owned(), user_id.clone()); - - let correlation_id = &correlation.id; - let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - - let store = CONFIG.storage().get_object_store(); - let correlation_bytes = serde_json::to_vec(&correlation)?; - store - .put_object(&path, Bytes::from(correlation_bytes)) - .await?; + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - // Save to memory - CORRELATIONS.update(&correlation).await?; + let correlation = CORRELATIONS.update(correlation, &session_key).await?; Ok(web::Json(correlation)) } -pub async fn delete(req: HttpRequest) -> Result { - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; +pub async fn delete( + req: HttpRequest, + correlation_id: Path, +) -> Result { + let correlation_id = correlation_id.into_inner(); let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) - .await?; - - // validate user's query auth - let permissions = Users.get_permissions(&session_key); - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - - user_auth_for_query(&permissions, tables)?; - - let correlation_id = &correlation.id; - let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - - let store = CONFIG.storage().get_object_store(); - store.delete_object(&path).await?; + CORRELATIONS.delete(&correlation_id, &user_id).await?; - // Delete from memory - CORRELATIONS.delete(correlation_id).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 50cef439c..8c24ad2f9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -27,7 +27,7 @@ use super::{ use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; @@ -806,15 +806,6 @@ pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) -> ]) } -pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([ - USERS_ROOT_DIR, - user_id, - CORRELATION_DIR, - correlation_file_name, - ]) -} - /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf {