From 6f237ce55757d13fab9d6d0bbe0edf05c5337553 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 21:56:53 +0530 Subject: [PATCH 01/11] refactor: `Deref` --- src/correlation.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index cf2c7bd82..706a6734b 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -41,11 +41,11 @@ use crate::{ pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); -#[derive(Debug, Default)] +#[derive(Debug, Default, derive_more::Deref)] pub struct Correlation(RwLock>); impl Correlation { - //load correlations from storage + // Load correlations from storage pub async fn load(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); let all_correlations = store.get_all_correlations().await.unwrap_or_default(); @@ -62,8 +62,8 @@ impl Correlation { }) .collect(); - let mut s = self.0.write().await; - s.extend(correlations); + self.write().await.extend(correlations); + Ok(()) } @@ -72,7 +72,7 @@ impl Correlation { session_key: &SessionKey, user_id: &str, ) -> Result, CorrelationError> { - let correlations = self.0.read().await.iter().cloned().collect_vec(); + let correlations = self.read().await.iter().cloned().collect_vec(); let mut user_correlations = vec![]; let permissions = Users.get_permissions(session_key); @@ -95,7 +95,7 @@ impl Correlation { correlation_id: &str, user_id: &str, ) -> Result { - let read = self.0.read().await; + let read = self.read().await; let correlation = read .iter() .find(|c| c.id == correlation_id && c.user_id == user_id) @@ -110,7 +110,7 @@ impl Correlation { pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { // save to memory - let mut s = self.0.write().await; + let mut s = self.write().await; s.retain(|c| c.id != correlation.id); s.push(correlation.clone()); Ok(()) @@ -118,7 +118,7 @@ impl Correlation { pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { // now delete from memory - let read_access = self.0.read().await; + let read_access = self.read().await; let index = read_access .iter() From 0c1b7bfa707ac3a6ce5448db402a85a43e8db591 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 23:25:47 +0530 Subject: [PATCH 02/11] refactor: store correlations as a mapping --- src/correlation.rs | 151 +++++++++++++++++++------------ src/handlers/http/correlation.rs | 40 ++------ src/storage/object_storage.rs | 11 +-- 3 files changed, 105 insertions(+), 97 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 706a6734b..703abae42 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -16,7 +16,7 @@ * */ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use actix_web::{http::header::ContentType, Error}; use chrono::Utc; @@ -24,13 +24,17 @@ use datafusion::error::DataFusionError; use http::StatusCode; use itertools::Itertools; use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; use tokio::sync::RwLock; -use tracing::{error, trace, warn}; +use tracing::error; use crate::{ - handlers::http::rbac::RBACError, + handlers::http::{ + rbac::RBACError, + users::{CORRELATION_DIR, USERS_ROOT_DIR}, + }, option::CONFIG, query::QUERY_SESSION, rbac::{map::SessionKey, Users}, @@ -41,8 +45,10 @@ use crate::{ pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); +type CorrelationMap = HashMap; + #[derive(Debug, Default, derive_more::Deref)] -pub struct Correlation(RwLock>); +pub struct Correlation(RwLock>); impl Correlation { // Load correlations from storage @@ -50,19 +56,21 @@ impl Correlation { let store = CONFIG.storage().get_object_store(); let all_correlations = store.get_all_correlations().await.unwrap_or_default(); - let correlations: Vec = all_correlations - .into_iter() - .flat_map(|(_, correlations_bytes)| correlations_bytes) - .filter_map(|correlation| { - serde_json::from_slice(&correlation) - .inspect_err(|e| { - error!("Unable to load correlation: {e}"); - }) - .ok() - }) - .collect(); - - self.write().await.extend(correlations); + for correlations_bytes in all_correlations.values().flatten() { + let Ok(correlation) = serde_json::from_slice::(correlations_bytes) + .inspect_err(|e| { + error!("Unable to load correlation file : {e}"); + }) + else { + continue; + }; + + self.write() + .await + .entry(correlation.user_id.to_owned()) + .or_insert_with(HashMap::new) + .insert(correlation.id.to_owned(), correlation); + } Ok(()) } @@ -72,21 +80,26 @@ impl Correlation { session_key: &SessionKey, user_id: &str, ) -> Result, CorrelationError> { - let correlations = self.read().await.iter().cloned().collect_vec(); + let Some(correlations) = self.read().await.get(user_id).cloned() else { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlations for user - {user_id}" + )))); + }; let mut user_correlations = vec![]; let permissions = Users.get_permissions(session_key); - for c in correlations { - let tables = &c + for correlation in correlations.values() { + let tables = &correlation .table_configs .iter() .map(|t| t.table_name.clone()) .collect_vec(); - if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id { - user_correlations.push(c); + if user_auth_for_query(&permissions, tables).is_ok() && correlation.user_id == user_id { + user_correlations.push(correlation.clone()); } } + Ok(user_correlations) } @@ -95,45 +108,57 @@ impl Correlation { correlation_id: &str, user_id: &str, ) -> Result { - let read = self.read().await; - let correlation = read - .iter() - .find(|c| c.id == correlation_id && c.user_id == user_id) - .cloned(); - - correlation.ok_or_else(|| { - CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlation with ID- {correlation_id}" - ))) - }) + self.read() + .await + .get(user_id) + .and_then(|correlations| correlations.get(correlation_id)) + .cloned() + .ok_or_else(|| { + CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlation with ID- {correlation_id}" + ))) + }) } + /// Insert new or replace existing correlation for the user and with the same ID pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { - // save to memory - let mut s = self.write().await; - s.retain(|c| c.id != correlation.id); - s.push(correlation.clone()); + // Update in storage + let correlation_bytes = serde_json::to_vec(&correlation)?.into(); + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .put_object(&path, correlation_bytes) + .await?; + + // Update in memory + self.write() + .await + .entry(correlation.user_id.to_owned()) + .or_insert_with(HashMap::new) + .insert(correlation.id.to_owned(), correlation.clone()); + Ok(()) } - pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { - // now delete from memory - let read_access = self.read().await; + /// Delete correlation from memory and storage + pub async fn delete(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + // Delete from memory + self.write() + .await + .entry(correlation.user_id.to_owned()) + .and_modify(|correlations| { + correlations.remove(&correlation.id); + }); + + // Delete from storage + let path = correlation.path(); + CONFIG + .storage() + .get_object_store() + .delete_object(&path) + .await?; - let index = read_access - .iter() - .enumerate() - .find(|(_, c)| c.id == correlation_id) - .to_owned(); - - if let Some((index, _)) = index { - // drop the read access in order to get exclusive write access - drop(read_access); - self.0.write().await.remove(index); - trace!("removed correlation from memory"); - } else { - warn!("Correlation ID- {correlation_id} not found in memory!"); - } Ok(()) } } @@ -144,13 +169,16 @@ pub enum CorrelationVersion { V1, } +type CorrelationId = String; +type UserId = String; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { pub version: CorrelationVersion, pub title: String, - pub id: String, - pub user_id: String, + pub id: CorrelationId, + pub user_id: UserId, pub table_configs: Vec, pub join_config: JoinConfig, pub filter: Option, @@ -158,7 +186,16 @@ pub struct CorrelationConfig { pub end_time: Option, } -impl CorrelationConfig {} +impl CorrelationConfig { + pub fn path(&self) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + &self.user_id, + CORRELATION_DIR, + &format!("{}.json", self.id), + ]) + } +} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 08a9b13d2..531890e01 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -16,13 +16,13 @@ * */ +use actix_web::web::Path; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use anyhow::Error; use bytes::Bytes; use itertools::Itertools; use crate::rbac::Users; -use crate::storage::object_storage::correlation_path; use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; use crate::{option::CONFIG, utils::actix::extract_session_key_from_req}; @@ -76,24 +76,12 @@ pub async fn get(req: HttpRequest) -> Result { pub async fn post(req: HttpRequest, body: Bytes) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; correlation_request.validate(&session_key).await?; - let mut correlation: CorrelationConfig = correlation_request.into(); - correlation.user_id.clone_from(&user_id); - let correlation_id = &correlation.id; - let path = correlation_path(&user_id, &format!("{}.json", correlation_id)); - - let store = CONFIG.storage().get_object_store(); - let correlation_bytes = serde_json::to_vec(&correlation)?; - store - .put_object(&path, Bytes::from(correlation_bytes)) - .await?; + let correlation: CorrelationConfig = correlation_request.into(); // Save to memory CORRELATIONS.update(&correlation).await?; @@ -132,8 +120,7 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Result Result { +pub async fn delete( + req: HttpRequest, + correlation_id: Path, +) -> Result { + let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) + .get_correlation(&correlation_id, &user_id) .await?; // validate user's query auth @@ -173,13 +159,7 @@ pub async fn delete(req: HttpRequest) -> Result ]) } -pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([ - USERS_ROOT_DIR, - user_id, - CORRELATION_DIR, - correlation_file_name, - ]) -} - /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf { From a385670ffabfd6c6084060ce12bef37f6912267f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 23:29:59 +0530 Subject: [PATCH 03/11] naming --- src/correlation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 703abae42..e896cb115 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -43,14 +43,14 @@ use crate::{ utils::{get_hash, user_auth_for_query}, }; -pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); +pub static CORRELATIONS: Lazy = Lazy::new(Correlations::default); type CorrelationMap = HashMap; #[derive(Debug, Default, derive_more::Deref)] -pub struct Correlation(RwLock>); +pub struct Correlations(RwLock>); -impl Correlation { +impl Correlations { // Load correlations from storage pub async fn load(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); From 1934ae4801ab0beb4197d4acc7ee57e76ff36093 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 Jan 2025 23:47:14 +0530 Subject: [PATCH 04/11] fix: double write on update --- src/handlers/http/correlation.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 531890e01..2abdb0dd9 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -120,15 +120,6 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Date: Fri, 17 Jan 2025 23:48:34 +0530 Subject: [PATCH 05/11] refactor: http + some more --- src/correlation.rs | 23 ++++++-------- src/handlers/http/correlation.rs | 53 +++++++++++++------------------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index e896cb115..13f5792ba 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -195,6 +195,15 @@ impl CorrelationConfig { &format!("{}.json", self.id), ]) } + + pub fn update(&mut self, correlation_request: CorrelationRequest) { + self.title = correlation_request.title; + self.table_configs = correlation_request.table_configs; + self.join_config = correlation_request.join_config; + self.filter = correlation_request.filter; + self.start_time = correlation_request.start_time; + self.end_time = correlation_request.end_time; + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -225,20 +234,6 @@ impl From for CorrelationConfig { } impl CorrelationRequest { - pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig { - CorrelationConfig { - version: CorrelationVersion::V1, - title: self.title, - id, - user_id, - table_configs: self.table_configs, - join_config: self.join_config, - filter: self.filter, - start_time: self.start_time, - end_time: self.end_time, - } - } - /// This function will validate the TableConfigs, JoinConfig, and user auth pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> { let ctx = &QUERY_SESSION; diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 2abdb0dd9..721e88e71 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -16,22 +16,20 @@ * */ -use actix_web::web::Path; +use actix_web::web::{Json, Path}; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use anyhow::Error; -use bytes::Bytes; use itertools::Itertools; use crate::rbac::Users; +use crate::utils::actix::extract_session_key_from_req; use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; -use crate::{option::CONFIG, utils::actix::extract_session_key_from_req}; use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; @@ -43,21 +41,19 @@ pub async fn list(req: HttpRequest) -> Result Ok(web::Json(correlations)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + correlation_id: Path, +) -> Result { + let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) + .get_correlation(&correlation_id, &user_id) .await?; let permissions = Users.get_permissions(&session_key); @@ -73,37 +69,35 @@ pub async fn get(req: HttpRequest) -> Result { Ok(web::Json(correlation)) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(correlation_request): Json, +) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - - let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; - correlation_request.validate(&session_key).await?; let correlation: CorrelationConfig = correlation_request.into(); - - // Save to memory CORRELATIONS.update(&correlation).await?; Ok(web::Json(correlation)) } -pub async fn modify(req: HttpRequest, body: Bytes) -> Result { +pub async fn modify( + req: HttpRequest, + correlation_id: Path, + Json(correlation_request): Json, +) -> Result { + let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation_id = req - .match_info() - .get("correlation_id") - .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; - // validate whether user has access to this correlation object or not - let correlation = CORRELATIONS - .get_correlation(correlation_id, &user_id) + let mut correlation = CORRELATIONS + .get_correlation(&correlation_id, &user_id) .await?; let permissions = Users.get_permissions(&session_key); let tables = &correlation @@ -111,15 +105,10 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Date: Sat, 18 Jan 2025 03:39:00 +0530 Subject: [PATCH 06/11] revert for readability --- src/correlation.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 13f5792ba..fafa6a424 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -57,12 +57,13 @@ impl Correlations { let all_correlations = store.get_all_correlations().await.unwrap_or_default(); for correlations_bytes in all_correlations.values().flatten() { - let Ok(correlation) = serde_json::from_slice::(correlations_bytes) - .inspect_err(|e| { + let correlation = match serde_json::from_slice::(correlations_bytes) + { + Ok(c) => c, + Err(e) => { error!("Unable to load correlation file : {e}"); - }) - else { - continue; + continue; + } }; self.write() From 73fbf53d57eb37e9038a03796fdd71999d2aa32f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 16:40:54 +0530 Subject: [PATCH 07/11] refactor: merge `CorrelationConfig` and `CorrelationRequest` --- src/correlation.rs | 53 ++++++++++---------------------- src/handlers/http/correlation.rs | 6 ++-- 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index fafa6a424..5c5b3ae04 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -164,21 +164,29 @@ impl Correlations { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum CorrelationVersion { + #[default] V1, } type CorrelationId = String; type UserId = String; +fn generate_correlation_id() -> CorrelationId { + get_hash(Utc::now().timestamp_micros().to_string().as_str()) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { + #[serde(skip_deserializing)] pub version: CorrelationVersion, pub title: String, + #[serde(skip_deserializing, default = "generate_correlation_id")] pub id: CorrelationId, + #[serde(skip_deserializing)] pub user_id: UserId, pub table_configs: Vec, pub join_config: JoinConfig, @@ -197,44 +205,15 @@ impl CorrelationConfig { ]) } - pub fn update(&mut self, correlation_request: CorrelationRequest) { - self.title = correlation_request.title; - self.table_configs = correlation_request.table_configs; - self.join_config = correlation_request.join_config; - self.filter = correlation_request.filter; - self.start_time = correlation_request.start_time; - self.end_time = correlation_request.end_time; - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CorrelationRequest { - pub title: String, - pub table_configs: Vec, - pub join_config: JoinConfig, - pub filter: Option, - pub start_time: Option, - pub end_time: Option, -} - -impl From for CorrelationConfig { - fn from(val: CorrelationRequest) -> Self { - Self { - version: CorrelationVersion::V1, - title: val.title, - id: get_hash(Utc::now().timestamp_micros().to_string().as_str()), - user_id: String::default(), - table_configs: val.table_configs, - join_config: val.join_config, - filter: val.filter, - start_time: val.start_time, - end_time: val.end_time, - } + pub fn update(&mut self, update: Self) { + self.title = update.title; + self.table_configs = update.table_configs; + self.join_config = update.join_config; + self.filter = update.filter; + self.start_time = update.start_time; + self.end_time = update.end_time; } -} -impl CorrelationRequest { /// This function will validate the TableConfigs, JoinConfig, and user auth pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> { let ctx = &QUERY_SESSION; diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 721e88e71..3cee738ae 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -25,7 +25,7 @@ use crate::rbac::Users; use crate::utils::actix::extract_session_key_from_req; use crate::utils::{get_hash, get_user_from_request, user_auth_for_query}; -use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS}; +use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) @@ -71,7 +71,7 @@ pub async fn get( pub async fn post( req: HttpRequest, - Json(correlation_request): Json, + Json(correlation_request): Json, ) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; @@ -86,7 +86,7 @@ pub async fn post( pub async fn modify( req: HttpRequest, correlation_id: Path, - Json(correlation_request): Json, + Json(correlation_request): Json, ) -> Result { let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) From 49f1b9719291e987dd88a58169b443fcd02d74e6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 16:41:44 +0530 Subject: [PATCH 08/11] fix: insert `user_id` --- src/handlers/http/correlation.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 3cee738ae..3546fda7f 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -71,13 +71,16 @@ pub async fn get( pub async fn post( req: HttpRequest, - Json(correlation_request): Json, + Json(mut correlation): Json, ) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - correlation_request.validate(&session_key).await?; + let user_id = get_user_from_request(&req) + .map(|s| get_hash(&s.to_string())) + .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; + correlation.user_id = user_id; - let correlation: CorrelationConfig = correlation_request.into(); + correlation.validate(&session_key).await?; CORRELATIONS.update(&correlation).await?; Ok(web::Json(correlation)) From dd38e2f3397b296dcc94c2e8c77e5eb6ac0151ec Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 20 Jan 2025 17:28:19 +0530 Subject: [PATCH 09/11] refactor: separate out creation and updation code path --- src/correlation.rs | 105 ++++++++++++++++++++----------- src/handlers/http/correlation.rs | 60 +++--------------- 2 files changed, 79 insertions(+), 86 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 5c5b3ae04..16a251a99 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -48,7 +48,7 @@ pub static CORRELATIONS: Lazy = Lazy::new(Correlations::default); type CorrelationMap = HashMap; #[derive(Debug, Default, derive_more::Deref)] -pub struct Correlations(RwLock>); +pub struct Correlations(RwLock); impl Correlations { // Load correlations from storage @@ -68,35 +68,26 @@ impl Correlations { self.write() .await - .entry(correlation.user_id.to_owned()) - .or_insert_with(HashMap::new) .insert(correlation.id.to_owned(), correlation); } Ok(()) } - pub async fn list_correlations_for_user( + pub async fn list_correlations( &self, session_key: &SessionKey, - user_id: &str, ) -> Result, CorrelationError> { - let Some(correlations) = self.read().await.get(user_id).cloned() else { - return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( - "Unable to find correlations for user - {user_id}" - )))); - }; - let mut user_correlations = vec![]; let permissions = Users.get_permissions(session_key); - for correlation in correlations.values() { + for correlation in self.read().await.values() { let tables = &correlation .table_configs .iter() .map(|t| t.table_name.clone()) .collect_vec(); - if user_auth_for_query(&permissions, tables).is_ok() && correlation.user_id == user_id { + if user_auth_for_query(&permissions, tables).is_ok() { user_correlations.push(correlation.clone()); } } @@ -107,12 +98,10 @@ impl Correlations { pub async fn get_correlation( &self, correlation_id: &str, - user_id: &str, ) -> Result { self.read() .await - .get(user_id) - .and_then(|correlations| correlations.get(correlation_id)) + .get(correlation_id) .cloned() .ok_or_else(|| { CorrelationError::AnyhowError(anyhow::Error::msg(format!( @@ -121,8 +110,15 @@ impl Correlations { }) } - /// Insert new or replace existing correlation for the user and with the same ID - pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + /// Create correlation associated with the user + pub async fn create( + &self, + mut correlation: CorrelationConfig, + session_key: &SessionKey, + ) -> Result { + correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + correlation.validate(&session_key).await?; + // Update in storage let correlation_bytes = serde_json::to_vec(&correlation)?.into(); let path = correlation.path(); @@ -133,24 +129,65 @@ impl Correlations { .await?; // Update in memory - self.write() - .await - .entry(correlation.user_id.to_owned()) - .or_insert_with(HashMap::new) - .insert(correlation.id.to_owned(), correlation.clone()); + self.write().await.insert( + correlation.id.to_owned(), + correlation.clone(), + ); - Ok(()) + Ok(correlation) + } + + /// Update existing correlation for the user and with the same ID + pub async fn update( + &self, + mut updated_correlation: CorrelationConfig, + session_key: &SessionKey, + ) -> Result { + // validate whether user has access to this correlation object or not + let correlation = self.get_correlation(&updated_correlation.id).await?; + if correlation.user_id != updated_correlation.user_id { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + r#"User "{}" isn't authorized to update correlation with ID - {}"#, + updated_correlation.user_id, correlation.id + )))); + } + + correlation.validate(&session_key).await?; + updated_correlation.update(correlation); + + // Update in storage + let correlation_bytes = serde_json::to_vec(&updated_correlation)?.into(); + let path = updated_correlation.path(); + CONFIG + .storage() + .get_object_store() + .put_object(&path, correlation_bytes) + .await?; + + // Update in memory + self.write().await.insert( + updated_correlation.id.to_owned(), + updated_correlation.clone(), + ); + + Ok(updated_correlation) } /// Delete correlation from memory and storage - pub async fn delete(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + pub async fn delete( + &self, + correlation_id: &str, + user_id: &str, + ) -> Result<(), CorrelationError> { + let correlation = CORRELATIONS.get_correlation(&correlation_id).await?; + if correlation.user_id != user_id { + return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"# + )))); + } + // Delete from memory - self.write() - .await - .entry(correlation.user_id.to_owned()) - .and_modify(|correlations| { - correlations.remove(&correlation.id); - }); + self.write().await.remove(&correlation.id); // Delete from storage let path = correlation.path(); @@ -174,17 +211,13 @@ pub enum CorrelationVersion { type CorrelationId = String; type UserId = String; -fn generate_correlation_id() -> CorrelationId { - get_hash(Utc::now().timestamp_micros().to_string().as_str()) -} - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { #[serde(skip_deserializing)] pub version: CorrelationVersion, pub title: String, - #[serde(skip_deserializing, default = "generate_correlation_id")] + #[serde(skip_deserializing)] pub id: CorrelationId, #[serde(skip_deserializing)] pub user_id: UserId, diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index 3546fda7f..47423f574 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -30,13 +30,8 @@ use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlations = CORRELATIONS - .list_correlations_for_user(&session_key, &user_id) - .await?; + let correlations = CORRELATIONS.list_correlations(&session_key).await?; Ok(web::Json(correlations)) } @@ -48,13 +43,8 @@ pub async fn get( let correlation_id = correlation_id.into_inner(); let session_key = extract_session_key_from_req(&req) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) - .map(|s| get_hash(&s.to_string())) - .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation = CORRELATIONS - .get_correlation(&correlation_id, &user_id) - .await?; + let correlation = CORRELATIONS.get_correlation(&correlation_id).await?; let permissions = Users.get_permissions(&session_key); @@ -80,8 +70,7 @@ pub async fn post( .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; correlation.user_id = user_id; - correlation.validate(&session_key).await?; - CORRELATIONS.update(&correlation).await?; + let correlation = CORRELATIONS.create(correlation, &session_key).await?; Ok(web::Json(correlation)) } @@ -89,30 +78,17 @@ pub async fn post( pub async fn modify( req: HttpRequest, correlation_id: Path, - Json(correlation_request): Json, + Json(mut correlation): Json, ) -> Result { - let correlation_id = correlation_id.into_inner(); - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - let user_id = get_user_from_request(&req) + correlation.id = correlation_id.into_inner(); + correlation.user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - // validate whether user has access to this correlation object or not - let mut correlation = CORRELATIONS - .get_correlation(&correlation_id, &user_id) - .await?; - let permissions = Users.get_permissions(&session_key); - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - user_auth_for_query(&permissions, tables)?; - correlation_request.validate(&session_key).await?; + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; - correlation.update(correlation_request); - CORRELATIONS.update(&correlation).await?; + let correlation = CORRELATIONS.update(correlation, &session_key).await?; Ok(web::Json(correlation)) } @@ -122,27 +98,11 @@ pub async fn delete( correlation_id: Path, ) -> Result { let correlation_id = correlation_id.into_inner(); - let session_key = extract_session_key_from_req(&req) - .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; let user_id = get_user_from_request(&req) .map(|s| get_hash(&s.to_string())) .map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?; - let correlation = CORRELATIONS - .get_correlation(&correlation_id, &user_id) - .await?; - - // validate user's query auth - let permissions = Users.get_permissions(&session_key); - let tables = &correlation - .table_configs - .iter() - .map(|t| t.table_name.clone()) - .collect_vec(); - - user_auth_for_query(&permissions, tables)?; - - CORRELATIONS.delete(&correlation).await?; + CORRELATIONS.delete(&correlation_id, &user_id).await?; Ok(HttpResponse::Ok().finish()) } From 33af7f3e0c23a68c1f9a9641832ccf1c2f0540e5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 23 Jan 2025 20:24:34 +0530 Subject: [PATCH 10/11] ci: clippy suggestions --- src/correlation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index 16a251a99..ce394441c 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -117,7 +117,7 @@ impl Correlations { session_key: &SessionKey, ) -> Result { correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); - correlation.validate(&session_key).await?; + correlation.validate(session_key).await?; // Update in storage let correlation_bytes = serde_json::to_vec(&correlation)?.into(); @@ -152,7 +152,7 @@ impl Correlations { )))); } - correlation.validate(&session_key).await?; + correlation.validate(session_key).await?; updated_correlation.update(correlation); // Update in storage @@ -179,7 +179,7 @@ impl Correlations { correlation_id: &str, user_id: &str, ) -> Result<(), CorrelationError> { - let correlation = CORRELATIONS.get_correlation(&correlation_id).await?; + let correlation = CORRELATIONS.get_correlation(correlation_id).await?; if correlation.user_id != user_id { return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"# From 580fd0741fef1d25f58f2ce86cff72562f7cfdb3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 12:15:03 +0530 Subject: [PATCH 11/11] fix: default, don't break previous expectation in files --- src/correlation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index ce394441c..7ffde6a8a 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -214,12 +214,12 @@ type UserId = String; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CorrelationConfig { - #[serde(skip_deserializing)] + #[serde(default)] pub version: CorrelationVersion, pub title: String, - #[serde(skip_deserializing)] + #[serde(default)] pub id: CorrelationId, - #[serde(skip_deserializing)] + #[serde(default)] pub user_id: UserId, pub table_configs: Vec, pub join_config: JoinConfig,