Skip to content
62 changes: 44 additions & 18 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ pub struct Options {
)]
pub ingestor_endpoint: String,

#[arg(
long,
env = "P_INDEXER_ENDPOINT",
default_value = "",
help = "URL to connect to this specific indexer. Default is the address of the server"
)]
pub indexer_endpoint: String,

#[command(flatten)]
pub oidc: Option<OidcConfig>,

Expand Down Expand Up @@ -404,29 +412,47 @@ impl Options {
}

/// TODO: refactor and document
pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;
pub fn get_url(&self, mode: Mode) -> Url {
let (endpoint, env_var) = match mode {
Mode::Ingest => {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
}
Mode::Index => {
if self.indexer_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
}
_ => panic!("Invalid mode"),
};

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
if endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
}

let mut hostname = addr_from_env[0].to_string();
Expand Down
34 changes: 28 additions & 6 deletions src/enterprise/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use chrono::{TimeZone, Utc};
use datafusion::{common::Column, prelude::Expr};
use itertools::Itertools;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -119,14 +120,35 @@ pub async fn fetch_parquet_file_paths(

selected_files
.into_iter()
.map(|file| {
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();

if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});
Comment on lines +123 to 153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate Path String Sub-slices

Extracting substrings like &date[1][5..9] risks panics if date[1] is shorter than expected. Consider verifying path segment lengths to guard against malformed or unexpected file paths.

- let year = &date[1][5..9];
+ if date[1].len() < 9 {
+     warn!("Unexpected file path format for: {:?}", date);
+     return None;
+ }
+ let year = &date[1][5..9];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();
if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();
if date[1].len() < 9 {
warn!("Unexpected file path format for: {:?}", date);
return None;
}
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();
if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});


Expand Down
23 changes: 21 additions & 2 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ use crate::HTTP_CLIENT;
use super::base_path_without_preceding_slash;
use super::ingest::PostError;
use super::logstream::error::StreamError;
use super::modal::IngestorMetadata;
use super::modal::{IndexerMetadata, IngestorMetadata};
use super::rbac::RBACError;
use super::role::RoleError;

type IngestorMetadataArr = Vec<IngestorMetadata>;

type IndexerMetadataArr = Vec<IndexerMetadata>;

pub const INTERNAL_STREAM_NAME: &str = "pmeta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
Expand Down Expand Up @@ -616,7 +618,6 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
Ok(actix_web::HttpResponse::Ok().json(dresses))
}

// update the .query.json file and return the new ingestorMetadataArr
pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
let store = PARSEABLE.storage.get_object_store();

Expand All @@ -635,6 +636,24 @@ pub async fn get_ingestor_info() -> anyhow::Result<IngestorMetadataArr> {
Ok(arr)
}

pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();

let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
.collect_vec();

Ok(arr)
}
Comment on lines +639 to +655
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Error Handling in Metadata Deserialization

Using unwrap_or_default() can silently swallow parsing errors and may hinder debugging if the metadata is malformed. Prefer propagating errors or logging them for better visibility.

Consider changing to:

 .map(|x| {
-    serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default()
+    serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_else(|e| {
+        error!("Failed to parse indexer metadata: {:?}", e);
+        IndexerMetadata::default()
+    })
 })

to detect and log failures, or fully propagate the error if data integrity is critical.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
.collect_vec();
Ok(arr)
}
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| {
serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_else(|e| {
error!("Failed to parse indexer metadata: {:?}", e);
IndexerMetadata::default()
})
})
.collect_vec();
Ok(arr)
}


pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
let domain_name = to_url_string(ingestor.into_inner());

Expand Down
20 changes: 5 additions & 15 deletions src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,22 +359,12 @@ where
}

Mode::Index => {
let accessable_endpoints = ["create", "delete"];
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
if !cond {
Box::pin(async {
Err(actix_web::error::ErrorUnauthorized(
"Only Index API can be accessed in Index Mode",
))
})
} else {
let fut = self.service.call(req);
let fut = self.service.call(req);

Box::pin(async move {
let res = fut.await?;
Ok(res)
})
}
Box::pin(async move {
let res = fut.await?;
Ok(res)
})
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::oneshot;

use crate::option::Mode;
use crate::{
analytics,
handlers::{
Expand Down Expand Up @@ -108,7 +109,7 @@ impl ParseableServer for IngestServer {
tokio::spawn(airplane::server());

// write the ingestor metadata to storage
PARSEABLE.store_ingestor_metadata().await?;
PARSEABLE.store_metadata(Mode::Ingest).await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
Expand Down Expand Up @@ -251,7 +252,7 @@ impl IngestServer {

// check for querier state. Is it there, or was it there in the past
// this should happen before the set the ingestor metadata
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
pub async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
// how do we check for querier state?
// based on the work flow of the system, the querier will always need to start first
// i.e the querier will create the `.parseable.json` file
Expand Down
Loading
Loading