Skip to content

Commit 382f480

Browse files
add indexer to cluster info and metrics (#1281)
add indexer server info in cluster info api response add indexer server metrics in cluster metrics api response
1 parent fab4ee3 commit 382f480

File tree

4 files changed

+272
-115
lines changed

4 files changed

+272
-115
lines changed

Diff for: src/handlers/http/cluster/mod.rs

+212-96
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
pub mod utils;
2020

21+
use futures::{future, stream, StreamExt};
2122
use std::collections::HashSet;
2223
use std::time::Duration;
2324

@@ -51,7 +52,7 @@ use crate::HTTP_CLIENT;
5152
use super::base_path_without_preceding_slash;
5253
use super::ingest::PostError;
5354
use super::logstream::error::StreamError;
54-
use super::modal::{IndexerMetadata, IngestorMetadata};
55+
use super::modal::{IndexerMetadata, IngestorMetadata, Metadata};
5556
use super::rbac::RBACError;
5657
use super::role::RoleError;
5758

@@ -541,72 +542,119 @@ pub async fn send_retention_cleanup_request(
541542
}
542543

543544
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
544-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
545-
error!("Fatal: failed to get ingestor info: {:?}", err);
546-
StreamError::Anyhow(err)
547-
})?;
545+
// Get ingestor and indexer metadata concurrently
546+
let (ingestor_result, indexer_result) =
547+
future::join(get_ingestor_info(), get_indexer_info()).await;
548548

549-
let mut infos = vec![];
549+
// Handle ingestor metadata result
550+
let ingestor_metadata = ingestor_result
551+
.map_err(|err| {
552+
error!("Fatal: failed to get ingestor info: {:?}", err);
553+
PostError::Invalid(err)
554+
})
555+
.map_err(|err| StreamError::Anyhow(err.into()))?;
550556

551-
for ingestor in ingestor_infos {
552-
let uri = Url::parse(&format!(
553-
"{}{}/about",
554-
ingestor.domain_name,
555-
base_path_without_preceding_slash()
556-
))
557-
.expect("should always be a valid url");
557+
// Handle indexer metadata result
558+
let indexer_metadata = indexer_result
559+
.map_err(|err| {
560+
error!("Fatal: failed to get indexer info: {:?}", err);
561+
PostError::Invalid(err)
562+
})
563+
.map_err(|err| StreamError::Anyhow(err.into()))?;
564+
565+
// Fetch info for both node types concurrently
566+
let (ingestor_infos, indexer_infos) = future::join(
567+
fetch_nodes_info(ingestor_metadata),
568+
fetch_nodes_info(indexer_metadata),
569+
)
570+
.await;
571+
572+
// Combine results from both node types
573+
let mut infos = Vec::new();
574+
infos.extend(ingestor_infos?);
575+
infos.extend(indexer_infos?);
558576

559-
let resp = HTTP_CLIENT
560-
.get(uri)
561-
.header(header::AUTHORIZATION, ingestor.token.clone())
562-
.header(header::CONTENT_TYPE, "application/json")
563-
.send()
564-
.await;
577+
Ok(actix_web::HttpResponse::Ok().json(infos))
578+
}
565579

566-
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
567-
let status = Some(resp.status().to_string());
580+
/// Fetches info for a single node (ingestor or indexer)
581+
async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, StreamError> {
582+
let uri = Url::parse(&format!(
583+
"{}{}/about",
584+
node.domain_name(),
585+
base_path_without_preceding_slash()
586+
))
587+
.expect("should always be a valid url");
568588

569-
let resp_data = resp.bytes().await.map_err(|err| {
570-
error!("Fatal: failed to parse ingestor info to bytes: {:?}", err);
571-
StreamError::Network(err)
572-
})?;
589+
let resp = HTTP_CLIENT
590+
.get(uri)
591+
.header(header::AUTHORIZATION, node.token().to_owned())
592+
.header(header::CONTENT_TYPE, "application/json")
593+
.send()
594+
.await;
573595

574-
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
575-
.map_err(|err| {
576-
error!("Fatal: failed to parse ingestor info: {:?}", err);
577-
StreamError::SerdeError(err)
578-
})?
579-
.get("staging")
580-
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
581-
"staging",
582-
)))?
583-
.as_str()
584-
.ok_or(StreamError::SerdeError(SerdeError::custom(
585-
"staging path not a string/ not provided",
586-
)))?
587-
.to_string();
588-
589-
(true, sp, None, status)
590-
} else {
591-
(
592-
false,
593-
"".to_owned(),
594-
resp.as_ref().err().map(|e| e.to_string()),
595-
resp.unwrap_err().status().map(|s| s.to_string()),
596-
)
597-
};
598-
599-
infos.push(utils::ClusterInfo::new(
600-
&ingestor.domain_name,
601-
reachable,
602-
staging_path,
603-
PARSEABLE.storage.get_endpoint(),
604-
error,
605-
status,
606-
));
596+
let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
597+
let status = Some(resp.status().to_string());
598+
599+
let resp_data = resp.bytes().await.map_err(|err| {
600+
error!("Fatal: failed to parse node info to bytes: {:?}", err);
601+
StreamError::Network(err)
602+
})?;
603+
604+
let sp = serde_json::from_slice::<JsonValue>(&resp_data)
605+
.map_err(|err| {
606+
error!("Fatal: failed to parse node info: {:?}", err);
607+
StreamError::SerdeError(err)
608+
})?
609+
.get("staging")
610+
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
611+
"staging",
612+
)))?
613+
.as_str()
614+
.ok_or(StreamError::SerdeError(SerdeError::custom(
615+
"staging path not a string/ not provided",
616+
)))?
617+
.to_string();
618+
619+
(true, sp, None, status)
620+
} else {
621+
(
622+
false,
623+
"".to_owned(),
624+
resp.as_ref().err().map(|e| e.to_string()),
625+
resp.unwrap_err().status().map(|s| s.to_string()),
626+
)
627+
};
628+
629+
Ok(utils::ClusterInfo::new(
630+
node.domain_name(),
631+
reachable,
632+
staging_path,
633+
PARSEABLE.storage.get_endpoint(),
634+
error,
635+
status,
636+
node.node_type(),
637+
))
638+
}
639+
640+
/// Fetches info for multiple nodes in parallel
641+
async fn fetch_nodes_info<T: Metadata>(
642+
nodes: Vec<T>,
643+
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
644+
let nodes_len = nodes.len();
645+
let results = stream::iter(nodes)
646+
.map(|node| async move { fetch_node_info(&node).await })
647+
.buffer_unordered(nodes_len) // No concurrency limit
648+
.collect::<Vec<_>>()
649+
.await;
650+
651+
// Collect results, propagating any errors
652+
let mut infos = Vec::with_capacity(results.len());
653+
for result in results {
654+
infos.push(result?);
607655
}
608656

609-
Ok(actix_web::HttpResponse::Ok().json(infos))
657+
Ok(infos)
610658
}
611659

612660
pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
@@ -702,60 +750,128 @@ pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, P
702750
Ok((msg, StatusCode::OK))
703751
}
704752

705-
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
706-
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
707-
error!("Fatal: failed to get ingestor info: {:?}", err);
708-
PostError::Invalid(err)
709-
})?;
710-
711-
let mut dresses = vec![];
712-
713-
for ingestor in ingestor_metadata {
714-
let uri = Url::parse(&format!(
715-
"{}{}/metrics",
716-
&ingestor.domain_name,
717-
base_path_without_preceding_slash()
718-
))
719-
.map_err(|err| {
720-
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
721-
})?;
722-
723-
// add a check to see if the ingestor is live
724-
if !check_liveness(&ingestor.domain_name).await {
725-
warn!("Ingestor {} is not live", ingestor.domain_name);
726-
continue;
727-
}
753+
/// Fetches metrics from a node (ingestor or indexer)
754+
async fn fetch_node_metrics<T>(node: &T) -> Result<Option<Metrics>, PostError>
755+
where
756+
T: Metadata + Send + Sync + 'static,
757+
{
758+
// Format the metrics URL
759+
let uri = Url::parse(&format!(
760+
"{}{}/metrics",
761+
node.domain_name(),
762+
base_path_without_preceding_slash()
763+
))
764+
.map_err(|err| PostError::Invalid(anyhow::anyhow!("Invalid URL in node metadata: {}", err)))?;
765+
766+
// Check if the node is live
767+
if !check_liveness(node.domain_name()).await {
768+
warn!("node {} is not live", node.domain_name());
769+
return Ok(None);
770+
}
728771

729-
let res = HTTP_CLIENT
730-
.get(uri)
731-
.header(header::AUTHORIZATION, &ingestor.token)
732-
.header(header::CONTENT_TYPE, "application/json")
733-
.send()
734-
.await;
772+
// Fetch metrics
773+
let res = HTTP_CLIENT
774+
.get(uri)
775+
.header(header::AUTHORIZATION, node.token())
776+
.header(header::CONTENT_TYPE, "application/json")
777+
.send()
778+
.await;
735779

736-
if let Ok(res) = res {
780+
match res {
781+
Ok(res) => {
737782
let text = res.text().await.map_err(PostError::NetworkError)?;
738783
let lines: Vec<Result<String, std::io::Error>> =
739784
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
740785

741786
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
742787
.map_err(|err| PostError::CustomError(err.to_string()))?
743788
.samples;
744-
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
789+
790+
let metrics = Metrics::from_prometheus_samples(sample, node)
745791
.await
746792
.map_err(|err| {
747-
error!("Fatal: failed to get ingestor metrics: {:?}", err);
793+
error!("Fatal: failed to get node metrics: {:?}", err);
748794
PostError::Invalid(err.into())
749795
})?;
750-
dresses.push(ingestor_metrics);
751-
} else {
796+
797+
Ok(Some(metrics))
798+
}
799+
Err(_) => {
752800
warn!(
753-
"Failed to fetch metrics from ingestor: {}\n",
754-
&ingestor.domain_name,
801+
"Failed to fetch metrics from node: {}\n",
802+
node.domain_name()
755803
);
804+
Ok(None)
805+
}
806+
}
807+
}
808+
809+
/// Fetches metrics from multiple nodes in parallel
810+
async fn fetch_nodes_metrics<T>(nodes: Vec<T>) -> Result<Vec<Metrics>, PostError>
811+
where
812+
T: Metadata + Send + Sync + 'static,
813+
{
814+
let nodes_len = nodes.len();
815+
let results = stream::iter(nodes)
816+
.map(|node| async move { fetch_node_metrics(&node).await })
817+
.buffer_unordered(nodes_len) // No concurrency limit
818+
.collect::<Vec<_>>()
819+
.await;
820+
821+
// Process results
822+
let mut metrics = Vec::new();
823+
for result in results {
824+
match result {
825+
Ok(Some(node_metrics)) => metrics.push(node_metrics),
826+
Ok(None) => {} // node was not live or metrics couldn't be fetched
827+
Err(err) => return Err(err),
756828
}
757829
}
758-
Ok(dresses)
830+
831+
Ok(metrics)
832+
}
833+
834+
/// Main function to fetch all cluster metrics, parallelized and refactored
835+
async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
836+
// Get ingestor and indexer metadata concurrently
837+
let (ingestor_result, indexer_result) =
838+
future::join(get_ingestor_info(), get_indexer_info()).await;
839+
840+
// Handle ingestor metadata result
841+
let ingestor_metadata = ingestor_result.map_err(|err| {
842+
error!("Fatal: failed to get ingestor info: {:?}", err);
843+
PostError::Invalid(err)
844+
})?;
845+
846+
// Handle indexer metadata result
847+
let indexer_metadata = indexer_result.map_err(|err| {
848+
error!("Fatal: failed to get indexer info: {:?}", err);
849+
PostError::Invalid(err)
850+
})?;
851+
852+
// Fetch metrics from ingestors and indexers concurrently
853+
let (ingestor_metrics, indexer_metrics) = future::join(
854+
fetch_nodes_metrics(ingestor_metadata),
855+
fetch_nodes_metrics(indexer_metadata),
856+
)
857+
.await;
858+
859+
// Combine all metrics
860+
let mut all_metrics = Vec::new();
861+
862+
// Add ingestor metrics
863+
match ingestor_metrics {
864+
Ok(metrics) => all_metrics.extend(metrics),
865+
Err(err) => return Err(err),
866+
}
867+
868+
// Add indexer metrics
869+
match indexer_metrics {
870+
Ok(metrics) => all_metrics.extend(metrics),
871+
Err(err) => return Err(err),
872+
}
873+
874+
Ok(all_metrics)
759875
}
760876

761877
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

Diff for: src/handlers/http/cluster/utils.rs

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pub struct ClusterInfo {
5555
storage_path: String,
5656
error: Option<String>, // error message if the ingestor is not reachable
5757
status: Option<String>, // status message if the ingestor is reachable
58+
node_type: String,
5859
}
5960

6061
impl ClusterInfo {
@@ -65,6 +66,7 @@ impl ClusterInfo {
6566
storage_path: String,
6667
error: Option<String>,
6768
status: Option<String>,
69+
node_type: &str,
6870
) -> Self {
6971
Self {
7072
domain_name: domain_name.to_string(),
@@ -73,6 +75,7 @@ impl ClusterInfo {
7375
storage_path,
7476
error,
7577
status,
78+
node_type: node_type.to_string(),
7679
}
7780
}
7881
}

0 commit comments

Comments
 (0)