diff --git a/src/banner.rs b/src/banner.rs index 97055587c..cbcf6cdb1 100644 --- a/src/banner.rs +++ b/src/banner.rs @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) { Staging Path: \"{}\"", "Storage:".to_string().bold(), config.get_storage_mode_string(), - config.staging_dir().to_string_lossy(), + config.options.staging_dir().to_string_lossy(), ); if let Some(path) = &config.options.hot_tier_storage_path { diff --git a/src/cli.rs b/src/cli.rs index 46a79a36c..281f60f51 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -17,7 +17,7 @@ */ use clap::Parser; -use std::path::PathBuf; +use std::{env, fs, path::PathBuf}; use url::Url; @@ -385,4 +385,74 @@ impl Options { pub fn is_default_creds(&self) -> bool { self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD } + + /// Path to staging directory, ensures that it exists or panics + pub fn staging_dir(&self) -> &PathBuf { + fs::create_dir_all(&self.local_staging_path) + .expect("Should be able to create dir if doesn't exist"); + + &self.local_staging_path + } + + /// TODO: refactor and document + pub fn get_url(&self) -> Url { + if self.ingestor_endpoint.is_empty() { + return format!( + "{}://{}", + self.get_scheme(), + self.address + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap_or_else(|err| { + panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address) + }); + } + + let ingestor_endpoint = &self.ingestor_endpoint; + + if ingestor_endpoint.starts_with("http") { + panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); + } + + let addr_from_env = ingestor_endpoint.split(':').collect::>(); + + if addr_from_env.len() != 2 { + panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); + } + + let mut hostname = addr_from_env[0].to_string(); + let mut port = addr_from_env[1].to_string(); + + // if the env var value fits the pattern $VAR_NAME:$VAR_NAME + // fetch the value from the specified env vars + if hostname.starts_with('$') { + let var_hostname = hostname[1..].to_string(); + hostname = env::var(&var_hostname).unwrap_or_default(); + + if hostname.is_empty() { + panic!("The environement variable `{}` is not set, please set as without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname); + } + if hostname.starts_with("http") { + panic!("Invalid value `{}`, please set the environement variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname); + } else { + hostname = format!("{}://{}", self.get_scheme(), hostname); + } + } + + if port.starts_with('$') { + let var_port = port[1..].to_string(); + port = env::var(&var_port).unwrap_or_default(); + + if port.is_empty() { + panic!( + "Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.", + var_port + ); + } + } + + format!("{}://{}:{}", self.get_scheme(), hostname, port) + .parse::() + .expect("Valid URL") + } } diff --git a/src/handlers/http/about.rs b/src/handlers/http/about.rs index 44d39e6b3..675d0182e 100644 --- a/src/handlers/http/about.rs +++ b/src/handlers/http/about.rs @@ -66,7 +66,7 @@ pub async fn about() -> Json { let staging = if PARSEABLE.options.mode == Mode::Query { "".to_string() } else { - PARSEABLE.staging_dir().display().to_string() + PARSEABLE.options.staging_dir().display().to_string() }; let grpc_port = PARSEABLE.options.grpc_port; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index f71739d8c..b9fb64edc 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -23,7 +23,6 @@ use crate::event::format::override_data_type; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::role::Action; use crate::rbac::Users; @@ -47,7 +46,8 @@ use tracing::warn; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { + // Error out if stream doesn't exist in memory, or in the case of query node, in storage as well + if PARSEABLE.check_or_load_stream(&stream_name).await { return Err(StreamNotFound(stream_name).into()); } @@ -120,15 +120,11 @@ pub async fn detect_schema(Json(json): Json) -> Result) -> Result { +pub async fn get_schema(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); // Ensure parseable is aware of stream in distributed mode - if PARSEABLE.options.mode == Mode::Query - && PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await? - { + if PARSEABLE.check_or_load_stream(&stream_name).await { return Err(StreamNotFound(stream_name.clone()).into()); } @@ -164,14 +160,8 @@ pub async fn get_retention(stream_name: Path) -> Result {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); } let retention = PARSEABLE @@ -183,28 +173,16 @@ pub async fn get_retention(stream_name: Path) -> Result, - Json(json): Json, + Json(retention): Json, ) -> Result { let stream_name = stream_name.into_inner(); // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name).into()); } - let stream = PARSEABLE.get_stream(&stream_name)?; - - let retention: Retention = match serde_json::from_value(json) { - Ok(retention) => retention, - Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), - }; PARSEABLE .storage @@ -212,7 +190,7 @@ pub async fn put_retention( .put_retention(&stream_name, &retention) .await?; - stream.set_retention(retention); + PARSEABLE.get_stream(&stream_name)?.set_retention(retention); Ok(( format!("set retention configuration for log stream {stream_name}"), @@ -250,21 +228,11 @@ pub async fn get_stats( ) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { - return Err(StreamNotFound(stream_name).into()); - } + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); } let query_string = req.query_string(); @@ -356,19 +324,13 @@ pub async fn get_stats( pub async fn get_stream_info(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { - return Err(StreamNotFound(stream_name).into()); - } + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); } + let storage = PARSEABLE.storage.get_object_store(); // if first_event_at is not found in memory map, check if it exists in the storage // if it exists in the storage, update the first_event_at in memory map @@ -417,14 +379,8 @@ pub async fn put_stream_hot_tier( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name).into()); } let stream = PARSEABLE.get_stream(&stream_name)?; @@ -467,21 +423,11 @@ pub async fn put_stream_hot_tier( pub async fn get_stream_hot_tier(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { - return Err(StreamNotFound(stream_name).into()); - } + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); } let Some(hot_tier_manager) = HotTierManager::global() else { @@ -500,14 +446,8 @@ pub async fn delete_stream_hot_tier( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name).into()); } if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 6b5711079..c0e7278b9 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -21,12 +21,12 @@ use std::{path::Path, sync::Arc}; use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; -use base64::Engine; +use base64::{prelude::BASE64_STANDARD, Engine}; use bytes::Bytes; use openid::Discovered; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Map, Value}; use ssl_acceptor::get_ssl_acceptor; use tokio::sync::oneshot; use tracing::{error, info, warn}; @@ -35,8 +35,8 @@ use crate::{ cli::Options, oidc::Claims, parseable::PARSEABLE, - storage::PARSEABLE_ROOT_DIRECTORY, - utils::{get_ingestor_id, get_url}, + storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY}, + utils::get_ingestor_id, }; use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; @@ -213,85 +213,71 @@ impl IngestorMetadata { } /// Capture metadata information by either loading it from staging or starting fresh - pub fn load() -> Arc { + pub fn load(options: &Options, storage: &dyn ObjectStorageProvider) -> Arc { // all the files should be in the staging directory root - let entries = std::fs::read_dir(&PARSEABLE.options.local_staging_path) + let entries = options + .staging_dir() + .read_dir() .expect("Couldn't read from file"); - let url = get_url(); + let url = options.get_url(); let port = url.port().unwrap_or(80).to_string(); let url = url.to_string(); let Options { username, password, .. - } = PARSEABLE.options.as_ref(); - let staging_path = PARSEABLE.staging_dir(); - let flight_port = PARSEABLE.options.flight_port.to_string(); + } = options; + let staging_path = options.staging_dir(); + let flight_port = options.flight_port.to_string(); for entry in entries { // cause the staging directory will have only one file with ingestor in the name // so the JSON Parse should not error unless the file is corrupted let path = entry.expect("Should be a directory entry").path(); - let flag = path + if !path .file_name() - .unwrap_or_default() - .to_str() - .unwrap_or_default() - .contains("ingestor"); - - if flag { - // get the ingestor metadata from staging - let text = std::fs::read(path).expect("File should be present"); - let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON"); - - // migrate the staging meta - let obj = meta - .as_object_mut() - .expect("Could Not parse Ingestor Metadata Json"); - - if obj.get("flight_port").is_none() { - obj.insert( - "flight_port".to_owned(), - Value::String(PARSEABLE.options.flight_port.to_string()), - ); - } - - let mut meta: IngestorMetadata = - serde_json::from_value(meta).expect("Couldn't write to disk"); - - // compare url endpoint and port - if meta.domain_name != url { - info!( - "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, url - ); - meta.domain_name = url; - } - - if meta.port != port { - info!("Port was Updated. Old: {} New: {}", meta.port, port); - meta.port = port; - } - - let token = - base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); - - let token = format!("Basic {}", token); - - if meta.token != token { - // TODO: Update the message to be more informative with username and password - info!( - "Credentials were Updated. Old: {} New: {}", - meta.token, token - ); - meta.token = token; - } - - meta.put_on_disk(staging_path) - .expect("Couldn't write to disk"); - return Arc::new(meta); + .and_then(|s| s.to_str()) + .is_some_and(|s| s.contains("ingestor")) + { + continue; + } + + // get the ingestor metadata from staging + let bytes = std::fs::read(path).expect("File should be present"); + let mut meta = + Self::from_bytes(&bytes, options.flight_port).expect("Extracted ingestor metadata"); + + // compare url endpoint and port, update + if meta.domain_name != url { + info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, url + ); + meta.domain_name = url; } + + if meta.port != port { + info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } + + let token = format!( + "Basic {}", + BASE64_STANDARD.encode(format!("{username}:{password}")) + ); + if meta.token != token { + // TODO: Update the message to be more informative with username and password + warn!( + "Credentials were Updated. Tokens updated; Old: {} New: {}", + meta.token, token + ); + meta.token = token; + } + meta.put_on_disk(staging_path) + .expect("Couldn't write to disk"); + + return Arc::new(meta); } - let storage = PARSEABLE.storage.get_object_store(); + let storage = storage.get_object_store(); let meta = Self::new( port, url, @@ -319,6 +305,15 @@ impl IngestorMetadata { ]) } + /// Updates json with `flight_port` field if not already present + fn from_bytes(bytes: &[u8], flight_port: u16) -> anyhow::Result { + let mut json: Map = serde_json::from_slice(bytes)?; + json.entry("flight_port") + .or_insert_with(|| Value::String(flight_port.to_string())); + + Ok(serde_json::from_value(Value::Object(json))?) + } + pub async fn migrate(&self) -> anyhow::Result> { let imp = self.file_path(); let bytes = match PARSEABLE.storage.get_object_store().get_object(&imp).await { @@ -327,22 +322,11 @@ impl IngestorMetadata { return Ok(None); } }; - let mut json = serde_json::from_slice::(&bytes)?; - let meta = json - .as_object_mut() - .ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?; - let fp = meta.get("flight_port"); - - if fp.is_none() { - meta.insert( - "flight_port".to_owned(), - Value::String(PARSEABLE.options.flight_port.to_string()), - ); - } - let bytes = Bytes::from(serde_json::to_vec(&json)?); - let resource: IngestorMetadata = serde_json::from_value(json)?; - resource.put_on_disk(PARSEABLE.staging_dir())?; + let resource = Self::from_bytes(&bytes, PARSEABLE.options.flight_port)?; + let bytes = Bytes::from(serde_json::to_vec(&resource)?); + + resource.put_on_disk(PARSEABLE.options.staging_dir())?; PARSEABLE .storage @@ -394,6 +378,20 @@ mod test { assert_eq!(rhs, lhs); } + #[rstest] + fn test_from_bytes_adds_flight_port() { + let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#; + let meta = IngestorMetadata::from_bytes(json, 8002).unwrap(); + assert_eq!(meta.flight_port, "8002"); + } + + #[rstest] + fn test_from_bytes_preserves_existing_flight_port() { + let json = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"9000"}"#; + let meta = IngestorMetadata::from_bytes(json, 8002).unwrap(); + assert_eq!(meta.flight_port, "9000"); + } + #[rstest] fn test_serialize_resource() { let im = IngestorMetadata::new( diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 25dd11aa2..264f8b9e9 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -275,7 +275,7 @@ impl QueryServer { // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( web::get() - .to(logstream::schema) + .to(logstream::get_schema) .authorize_for_stream(Action::GetSchema), ), ) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index aef35fcc7..27a4d30f4 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -349,7 +349,7 @@ impl Server { // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( web::get() - .to(logstream::schema) + .to(logstream::get_schema) .authorize_for_stream(Action::GetSchema), ), ) diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index 1de4da87a..1a04c5ba5 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -19,7 +19,7 @@ use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::IngestorMetadata; -use crate::utils::get_url; +use crate::parseable::PARSEABLE; use crate::HTTP_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; @@ -61,7 +61,7 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { - let url = get_url(); + let url = PARSEABLE.options.get_url(); let address = format!( "http://{}:{}", url.domain() diff --git a/src/migration/mod.rs b/src/migration/mod.rs index da8b795a1..5e82ec0d6 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -325,7 +325,8 @@ pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { } pub fn get_staging_metadata(config: &Parseable) -> anyhow::Result> { - let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.staging_dir()); + let path = + RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(config.options.staging_dir()); let bytes = match std::fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { @@ -351,7 +352,7 @@ pub fn put_staging_metadata( config: &Parseable, metadata: &serde_json::Value, ) -> anyhow::Result<()> { - let path = config.staging_dir().join(".parseable.json"); + let path = config.options.staging_dir().join(".parseable.json"); let mut file = OpenOptions::new() .create(true) .truncate(true) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 04b754487..2fba1d5b5 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -69,7 +69,7 @@ pub const STREAM_EXISTS: &str = "Stream exists"; /// Shared state of the Parseable server. pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage { StorageOptions::Local(args) => { - if args.options.local_staging_path == args.storage.root { + if args.options.staging_dir() == &args.storage.root { clap::Error::raw( ErrorKind::ValueValidation, "Cannot use same path for storage and staging", @@ -129,7 +129,7 @@ impl Parseable { storage: Arc, ) -> Self { let ingestor_metadata = match &options.mode { - Mode::Ingest => Some(IngestorMetadata::load()), + Mode::Ingest => Some(IngestorMetadata::load(&options, storage.as_ref())), _ => None, }; Parseable { @@ -169,6 +169,16 @@ impl Parseable { ) } + /// Checks for the stream in memory, or loads it from storage when in distributed mode + pub async fn check_or_load_stream(&self, stream_name: &str) -> bool { + !self.streams.contains(stream_name) + && (self.options.mode != Mode::Query + || !self + .create_stream_and_schema_from_storage(stream_name) + .await + .unwrap_or_default()) + } + /// Writes all streams in staging onto disk, awaiting conversion into parquet. /// Deletes all in memory recordbatches, freeing up rows in mem-writer. pub fn flush_all_streams(&self) { @@ -217,10 +227,6 @@ impl Parseable { self.storage.clone() } - pub fn staging_dir(&self) -> &PathBuf { - &self.options.local_staging_path - } - pub fn hot_tier_dir(&self) -> &Option { &self.options.hot_tier_storage_path } diff --git a/src/stats.rs b/src/stats.rs index 0016dbdbe..14aee0d3f 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,5 +1,3 @@ -use tracing::warn; - /* * Parseable Server (C) 2022 - 2024 Parseable, Inc. * @@ -17,6 +15,14 @@ use tracing::warn; * along with this program. If not, see . * */ + +use std::sync::Arc; + +use prometheus::core::Collector; +use prometheus::proto::MetricFamily; +use prometheus::IntGaugeVec; +use tracing::warn; + use crate::metrics::{ DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, @@ -24,7 +30,6 @@ use crate::metrics::{ LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, }; use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; -use std::sync::Arc; /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] @@ -176,13 +181,26 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - EVENTS_INGESTED_DATE.remove_label_values(&event_labels)?; - EVENTS_INGESTED_SIZE_DATE.remove_label_values(&event_labels)?; - EVENTS_STORAGE_SIZE_DATE.remove_label_values(&storage_size_labels)?; + delete_with_label_prefix(&EVENTS_INGESTED_DATE, &event_labels); + delete_with_label_prefix(&EVENTS_INGESTED_SIZE_DATE, &event_labels); + delete_with_label_prefix(&EVENTS_STORAGE_SIZE_DATE, &storage_size_labels); Ok(()) } +fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { + let families: Vec = metrics.collect().into_iter().collect(); + for metric in families.iter().flat_map(|m| m.get_metric()) { + let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect(); + if !label.starts_with(prefix) { + continue; + } + if let Err(err) = metrics.remove_label_values(&label) { + warn!("Error = {err}"); + } + } +} + pub fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { [stream_name, format] } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 59ab1c699..bc022ecd1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,7 +45,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::{error, info, warn}; +use tracing::{debug, error, warn}; use ulid::Ulid; use std::collections::{BTreeMap, HashSet}; @@ -257,12 +257,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result<(), ObjectStorageError> { let path = stream_json_path(stream_name); let stream_metadata = self.get_object(&path).await?; - let stats = - serde_json::to_value(retention).expect("rentention tasks are perfectly serializable"); - let mut stream_metadata: serde_json::Value = + let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - - stream_metadata["retention"] = stats; + stream_metadata.retention = Some(retention.clone()); self.put_object(&path, to_bytes(&stream_metadata)).await } @@ -645,13 +642,13 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { - if !Path::new(&PARSEABLE.staging_dir()).exists() { + if !Path::new(&PARSEABLE.options.staging_dir()).exists() { return Ok(()); } // get all streams for stream_name in PARSEABLE.streams.list() { - info!("Starting object_store_sync for stream- {stream_name}"); + debug!("Starting object_store_sync for stream- {stream_name}"); let stream = PARSEABLE.get_or_create_stream(&stream_name); let custom_partition = stream.get_custom_partition(); diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 6552333da..a10f51516 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -70,7 +70,7 @@ impl Default for StorageMetadata { Self { version: CURRENT_STORAGE_METADATA_VERSION.to_string(), mode: PARSEABLE.storage.name().to_owned(), - staging: PARSEABLE.staging_dir().to_path_buf(), + staging: PARSEABLE.options.staging_dir().to_path_buf(), storage: PARSEABLE.storage.get_endpoint(), deployment_id: uid::gen(), server_mode: PARSEABLE.options.mode, @@ -134,8 +134,8 @@ pub async fn resolve_parseable_metadata( if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { - create_dir_all(PARSEABLE.staging_dir())?; - metadata.staging = PARSEABLE.staging_dir().canonicalize()?; + create_dir_all(PARSEABLE.options.staging_dir())?; + metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?; // this flag is set to true so that metadata is copied to staging overwrite_staging = true; // overwrite remote in all and query mode @@ -151,20 +151,20 @@ pub async fn resolve_parseable_metadata( Mode::Query => { overwrite_remote = true; metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.staging_dir().to_path_buf(); + metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); }, Mode::Ingest => { // if ingest server is started fetch the metadata from remote // update the server mode for local metadata metadata.server_mode = PARSEABLE.options.mode; - metadata.staging = PARSEABLE.staging_dir().to_path_buf(); + metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); }, } Ok(metadata) } } EnvChange::CreateBoth => { - create_dir_all(PARSEABLE.staging_dir())?; + create_dir_all(PARSEABLE.options.staging_dir())?; let metadata = StorageMetadata::default(); // new metadata needs to be set // if mode is query or all then both staging and remote @@ -237,7 +237,8 @@ pub enum EnvChange { } pub fn get_staging_metadata() -> io::Result> { - let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME).to_path(PARSEABLE.staging_dir()); + let path = RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME) + .to_path(PARSEABLE.options.staging_dir()); let bytes = match fs::read(path) { Ok(bytes) => bytes, Err(err) => match err.kind() { @@ -259,8 +260,11 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let mut staging_metadata = meta.clone(); staging_metadata.server_mode = PARSEABLE.options.mode; - staging_metadata.staging = PARSEABLE.staging_dir().to_path_buf(); - let path = PARSEABLE.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); + staging_metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); + let path = PARSEABLE + .options + .staging_dir() + .join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() .create(true) .truncate(true) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bf5b1258f..46335da6d 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -26,7 +26,6 @@ pub mod uid; pub mod update; use crate::handlers::http::rbac::RBACError; -use crate::parseable::PARSEABLE; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use actix::extract_session_key_from_req; @@ -34,9 +33,7 @@ use actix_web::HttpRequest; use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; use regex::Regex; use sha2::{Digest, Sha256}; -use std::env; use tracing::debug; -use url::Url; /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" @@ -55,72 +52,6 @@ pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { Some(format!("{block_start:02}-{block_end:02}")) } -pub fn get_url() -> Url { - if PARSEABLE.options.ingestor_endpoint.is_empty() { - return format!( - "{}://{}", - PARSEABLE.options.get_scheme(), - PARSEABLE.options.address - ) - .parse::() // if the value was improperly set, this will panic before hand - .unwrap_or_else(|err| { - panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", PARSEABLE.options.address) - }); - } - - let ingestor_endpoint = &PARSEABLE.options.ingestor_endpoint; - - if ingestor_endpoint.starts_with("http") { - panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); - } - - let addr_from_env = ingestor_endpoint.split(':').collect::>(); - - if addr_from_env.len() != 2 { - panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint); - } - - let mut hostname = addr_from_env[0].to_string(); - let mut port = addr_from_env[1].to_string(); - - // if the env var value fits the pattern $VAR_NAME:$VAR_NAME - // fetch the value from the specified env vars - if hostname.starts_with('$') { - let var_hostname = hostname[1..].to_string(); - hostname = get_from_env(&var_hostname); - - if hostname.is_empty() { - panic!("The environement variable `{}` is not set, please set as without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname); - } - if hostname.starts_with("http") { - panic!("Invalid value `{}`, please set the environement variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname); - } else { - hostname = format!("{}://{}", PARSEABLE.options.get_scheme(), hostname); - } - } - - if port.starts_with('$') { - let var_port = port[1..].to_string(); - port = get_from_env(&var_port); - - if port.is_empty() { - panic!( - "Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.", - var_port - ); - } - } - - format!("{}://{}:{}", PARSEABLE.options.get_scheme(), hostname, port) - .parse::() - .expect("Valid URL") -} - -/// util fuction to fetch value from an env var -fn get_from_env(var_to_fetch: &str) -> String { - env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) -} - pub fn get_ingestor_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string();