@@ -72,6 +72,19 @@ impl CockroachAdminClient {
7272 . with_context ( || "Failed to parse Prometheus metrics" )
7373 }
7474
75+ /// Fetch the local node ID from the cockroach-admin service
76+ ///
77+ /// This API is (and must remain) cancel-safe
78+ async fn fetch_local_node_id ( & self ) -> Result < String > {
79+ let response = self
80+ . client
81+ . local_node_id ( )
82+ . await
83+ . with_context ( || "Failed to get local node ID" ) ?;
84+
85+ Ok ( response. into_inner ( ) . node_id )
86+ }
87+
7588 /// Fetch node status information for all nodes
7689 ///
7790 /// Note that although we're asking a single node for this information, the
@@ -109,9 +122,10 @@ impl CockroachAdminClient {
109122/// ```rust,no_run
110123/// # use omicron_cockroach_metrics::CockroachClusterAdminClient;
111124/// # use std::net::SocketAddr;
125+ /// # use std::time::Duration;
112126/// # use slog::Logger;
113127/// # async fn example(log: Logger) -> anyhow::Result<()> {
114- /// let cluster = CockroachClusterAdminClient::new(log);
128+ /// let cluster = CockroachClusterAdminClient::new(log, Duration::from_secs(15) );
115129///
116130/// // Update backends when addresses change (e.g., from DNS resolution)
117131/// let backends: Vec<SocketAddr> = vec![
@@ -292,27 +306,53 @@ impl CockroachClusterAdminClient {
292306 /// Fetch Prometheus metrics from all backends, returning all successful results
293307 pub async fn fetch_prometheus_metrics_from_all_nodes (
294308 & self ,
295- ) -> Vec < ( SocketAddr , PrometheusMetrics ) > {
309+ ) -> Vec < ( NodeId , PrometheusMetrics ) > {
296310 let clients = self . clients . read ( ) . await ;
297311
298312 if clients. is_empty ( ) {
299313 return Vec :: new ( ) ;
300314 }
301315
302316 // Collect tasks from all nodes in parallel
303- let mut results = Vec :: new ( ) ;
317+ let mut results: Vec <
318+ Result < ( NodeId , PrometheusMetrics ) , anyhow:: Error > ,
319+ > = Vec :: new ( ) ;
304320 let mut tasks = ParallelTaskSet :: new ( ) ;
305321 for ( addr, client) in clients. iter ( ) {
306322 let addr = * addr;
307323 let client = client. clone ( ) ;
308- if let Some ( result) =
309- tasks
310- . spawn ( {
311- async move {
312- ( addr, client. fetch_prometheus_metrics ( ) . await )
313- }
314- } )
315- . await
324+ if let Some ( result) = tasks
325+ . spawn ( {
326+ async move {
327+ // First get the local node ID - early exit if this fails
328+ let node_id_string = client
329+ . fetch_local_node_id ( )
330+ . await
331+ . map_err ( |e| {
332+ anyhow:: anyhow!(
333+ "Failed to get node ID from {}: {}" ,
334+ addr,
335+ e
336+ )
337+ } ) ?;
338+ let node_id = NodeId :: new ( node_id_string) ;
339+
340+ // Then fetch the metrics
341+ let metrics = client
342+ . fetch_prometheus_metrics ( )
343+ . await
344+ . map_err ( |e| {
345+ anyhow:: anyhow!(
346+ "Failed to get metrics from {}: {}" ,
347+ addr,
348+ e
349+ )
350+ } ) ?;
351+
352+ Ok ( ( node_id, metrics) )
353+ }
354+ } )
355+ . await
316356 {
317357 results. push ( result) ;
318358 }
@@ -322,22 +362,21 @@ impl CockroachClusterAdminClient {
322362 // Collect all successful results
323363 let mut successful_results = Vec :: new ( ) ;
324364 let mut results_iter = results. into_iter ( ) ;
325- while let Some ( ( addr , result) ) = results_iter. next ( ) {
365+ while let Some ( result) = results_iter. next ( ) {
326366 match result {
327- Ok ( metrics) => {
367+ Ok ( ( node_id , metrics) ) => {
328368 debug ! (
329369 self . log,
330370 "Successfully fetched metrics from CockroachDB node" ;
331- "address " => %addr
371+ "node_id " => %node_id
332372 ) ;
333- successful_results. push ( ( addr , metrics) ) ;
373+ successful_results. push ( ( node_id , metrics) ) ;
334374 }
335375 Err ( e) => {
336376 // Log the error but continue trying other backends
337377 warn ! (
338378 self . log,
339379 "Failed to fetch metrics from CockroachDB node" ;
340- "address" => %addr,
341380 "error" => %e
342381 ) ;
343382 }
@@ -350,22 +389,48 @@ impl CockroachClusterAdminClient {
350389 /// Fetch node status from all backends, returning all successful results
351390 pub async fn fetch_node_status_from_all_nodes (
352391 & self ,
353- ) -> Vec < ( SocketAddr , NodesResponse ) > {
392+ ) -> Vec < ( NodeId , NodesResponse ) > {
354393 let clients = self . clients . read ( ) . await ;
355394
356395 if clients. is_empty ( ) {
357396 return Vec :: new ( ) ;
358397 }
359398
360399 // Create futures for all requests
361- let mut results = Vec :: new ( ) ;
400+ let mut results: Vec < Result < ( NodeId , NodesResponse ) , anyhow:: Error > > =
401+ Vec :: new ( ) ;
362402 let mut tasks = ParallelTaskSet :: new ( ) ;
363403 for ( addr, client) in clients. iter ( ) {
364404 let addr = * addr;
365405 let client = client. clone ( ) ;
366406 if let Some ( result) = tasks
367407 . spawn ( {
368- async move { ( addr, client. fetch_node_status ( ) . await ) }
408+ async move {
409+ // First get the local node ID - early exit if this fails
410+ let node_id_string = client
411+ . fetch_local_node_id ( )
412+ . await
413+ . map_err ( |e| {
414+ anyhow:: anyhow!(
415+ "Failed to get node ID from {}: {}" ,
416+ addr,
417+ e
418+ )
419+ } ) ?;
420+ let node_id = NodeId :: new ( node_id_string) ;
421+
422+ // Then fetch the node status
423+ let status =
424+ client. fetch_node_status ( ) . await . map_err ( |e| {
425+ anyhow:: anyhow!(
426+ "Failed to get node status from {}: {}" ,
427+ addr,
428+ e
429+ )
430+ } ) ?;
431+
432+ Ok ( ( node_id, status) )
433+ }
369434 } )
370435 . await
371436 {
@@ -377,22 +442,21 @@ impl CockroachClusterAdminClient {
377442 // Collect all successful results
378443 let mut successful_results = Vec :: new ( ) ;
379444 let mut results_iter = results. into_iter ( ) ;
380- while let Some ( ( addr , result) ) = results_iter. next ( ) {
445+ while let Some ( result) = results_iter. next ( ) {
381446 match result {
382- Ok ( status) => {
447+ Ok ( ( node_id , status) ) => {
383448 debug ! (
384449 self . log,
385450 "Successfully fetched node status from CockroachDB node" ;
386- "address " => %addr
451+ "node_id " => %node_id
387452 ) ;
388- successful_results. push ( ( addr , status) ) ;
453+ successful_results. push ( ( node_id , status) ) ;
389454 }
390455 Err ( e) => {
391456 // Log the error but continue trying other backends
392457 warn ! (
393458 self . log,
394459 "Failed to fetch node status from CockroachDB node" ;
395- "address" => %addr,
396460 "error" => %e
397461 ) ;
398462 }
@@ -705,28 +769,20 @@ impl PrometheusMetrics {
705769}
706770
707771/// CockroachDB Node ID
708- #[ derive(
709- Debug ,
710- Clone ,
711- Copy ,
712- PartialEq ,
713- Eq ,
714- Hash ,
715- PartialOrd ,
716- Ord ,
717- Serialize ,
718- Deserialize ,
719- ) ]
720- #[ serde( transparent) ]
721- pub struct NodeId ( pub i32 ) ;
772+ ///
773+ /// This field is stored internally as a String to avoid questions
774+ /// about size, signedness, etc - it can be treated as an arbitrary
775+ /// unique identifier.
776+ #[ derive( Debug , Clone , PartialEq , Eq , Hash , PartialOrd , Ord , Serialize ) ]
777+ pub struct NodeId ( pub String ) ;
722778
723779impl NodeId {
724- pub fn new ( id : i32 ) -> Self {
780+ pub fn new ( id : String ) -> Self {
725781 Self ( id)
726782 }
727783
728- pub fn as_i32 ( & self ) -> i32 {
729- self . 0
784+ pub fn as_str ( & self ) -> & str {
785+ & self . 0
730786 }
731787}
732788
@@ -737,10 +793,65 @@ impl std::fmt::Display for NodeId {
737793}
738794
739795impl std:: str:: FromStr for NodeId {
740- type Err = std:: num :: ParseIntError ;
796+ type Err = std:: convert :: Infallible ;
741797
742798 fn from_str ( s : & str ) -> Result < Self , Self :: Err > {
743- Ok ( Self ( s. parse ( ) ?) )
799+ Ok ( Self ( s. to_string ( ) ) )
800+ }
801+ }
802+
803+ // When parsing the underlying NodeId, we force it to be interpreted
804+ // as a String. Without this custom Deserialize implementation, we
805+ // encounter parsing errors when querying endpoints which return the
806+ // NodeId as an integer.
807+ impl < ' de > serde:: Deserialize < ' de > for NodeId {
808+ fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
809+ where
810+ D : serde:: Deserializer < ' de > ,
811+ {
812+ use serde:: de:: { Error , Visitor } ;
813+ use std:: fmt;
814+
815+ struct NodeIdVisitor ;
816+
817+ impl < ' de > Visitor < ' de > for NodeIdVisitor {
818+ type Value = NodeId ;
819+
820+ fn expecting ( & self , formatter : & mut fmt:: Formatter ) -> fmt:: Result {
821+ formatter
822+ . write_str ( "a string or integer representing a node ID" )
823+ }
824+
825+ fn visit_str < E > ( self , value : & str ) -> Result < Self :: Value , E >
826+ where
827+ E : Error ,
828+ {
829+ Ok ( NodeId ( value. to_string ( ) ) )
830+ }
831+
832+ fn visit_string < E > ( self , value : String ) -> Result < Self :: Value , E >
833+ where
834+ E : Error ,
835+ {
836+ Ok ( NodeId ( value) )
837+ }
838+
839+ fn visit_i64 < E > ( self , value : i64 ) -> Result < Self :: Value , E >
840+ where
841+ E : Error ,
842+ {
843+ Ok ( NodeId ( value. to_string ( ) ) )
844+ }
845+
846+ fn visit_u64 < E > ( self , value : u64 ) -> Result < Self :: Value , E >
847+ where
848+ E : Error ,
849+ {
850+ Ok ( NodeId ( value. to_string ( ) ) )
851+ }
852+ }
853+
854+ deserializer. deserialize_any ( NodeIdVisitor )
744855 }
745856}
746857
0 commit comments