@@ -16,48 +16,50 @@ use crate::agent::http_transport::dynamic_routing::{
1616 messages:: { FetchedNodes , NodeHealthState } ,
1717 node:: Node ,
1818 snapshot:: routing_snapshot:: RoutingSnapshot ,
19- type_aliases:: { GlobalShared , ReceiverMpsc , ReceiverWatch , SenderMpsc } ,
19+ type_aliases:: { AtomicSwap , ReceiverMpsc , ReceiverWatch , SenderMpsc } ,
2020} ;
2121
2222const CHANNEL_BUFFER : usize = 128 ;
2323
24- ///
24+ /// A trait representing a health check of the node.
2525#[ async_trait]
2626pub trait HealthCheck : Send + Sync + Debug {
27- ///
27+ /// Checks the health of the node.
2828 async fn check ( & self , node : & Node ) -> anyhow:: Result < HealthCheckStatus > ;
2929}
3030
31- ///
31+ /// A struct representing the health check status of the node.
3232#[ derive( Clone , PartialEq , Debug , Default ) ]
3333pub struct HealthCheckStatus {
34- ///
35- pub latency : Option < Duration > ,
34+ latency : Option < Duration > ,
3635}
3736
38- ///
3937impl HealthCheckStatus {
40- ///
38+ /// Creates a new `HealthCheckStatus` instance.
4139 pub fn new ( latency : Option < Duration > ) -> Self {
4240 Self { latency }
4341 }
4442
45- ///
43+ /// Checks if the node is healthy.
4644 pub fn is_healthy ( & self ) -> bool {
4745 self . latency . is_some ( )
4846 }
47+
48+ /// Get the latency of the health check.
49+ pub fn latency ( & self ) -> Option < Duration > {
50+ self . latency
51+ }
4952}
5053
51- ///
54+ /// A struct implementing the `HealthCheck` for the nodes.
5255#[ derive( Debug ) ]
5356pub struct HealthChecker {
5457 http_client : Client ,
5558 timeout : Duration ,
5659}
5760
58- ///
5961impl HealthChecker {
60- ///
62+ /// Creates a new `HealthChecker` instance.
6163 pub fn new ( http_client : Client , timeout : Duration ) -> Self {
6264 Self {
6365 http_client,
@@ -96,16 +98,22 @@ impl HealthCheck for HealthChecker {
9698
9799const HEALTH_CHECK_ACTOR : & str = "HealthCheckActor" ;
98100
101+ /// A struct performing the health check of the node and sending the health status to the listener.
99102struct HealthCheckActor {
103+ /// The health checker.
100104 checker : Arc < dyn HealthCheck > ,
105+ /// The period of the health check.
101106 period : Duration ,
107+ /// The node to check.
102108 node : Node ,
109+ /// The sender channel (listener) to send the health status.
103110 sender_channel : SenderMpsc < NodeHealthState > ,
111+ /// The cancellation token of the actor.
104112 token : CancellationToken ,
105113}
106114
107115impl HealthCheckActor {
108- pub fn new (
116+ fn new (
109117 checker : Arc < dyn HealthCheck > ,
110118 period : Duration ,
111119 node : Node ,
@@ -121,7 +129,8 @@ impl HealthCheckActor {
121129 }
122130 }
123131
124- pub async fn run ( self ) {
132+ /// Runs the actor.
133+ async fn run ( self ) {
125134 let mut interval = time:: interval ( self . period ) ;
126135 loop {
127136 tokio:: select! {
@@ -143,33 +152,46 @@ impl HealthCheckActor {
143152 }
144153}
145154
146- ///
155+ /// The name of the health manager actor.
147156pub const HEALTH_MANAGER_ACTOR : & str = "HealthManagerActor" ;
148157
149- ///
158+ /// A struct managing the health checks of the nodes.
159+ /// It receives the fetched nodes from the `NodesFetchActor` and starts the health checks for them.
160+ /// It also receives the health status of the nodes from the `HealthCheckActor/s` and updates the routing snapshot.
150161pub struct HealthManagerActor < S > {
162+ /// The health checker.
151163 checker : Arc < dyn HealthCheck > ,
164+ /// The period of the health check.
152165 period : Duration ,
153- nodes_snapshot : GlobalShared < S > ,
166+ /// The routing snapshot, storing the nodes.
167+ routing_snapshot : AtomicSwap < S > ,
168+ /// The receiver channel to listen to the fetched nodes messages.
154169 fetch_receiver : ReceiverWatch < FetchedNodes > ,
170+ /// The sender channel to send the health status of the nodes back to HealthManagerActor.
155171 check_sender : SenderMpsc < NodeHealthState > ,
172+ /// The receiver channel to receive the health status of the nodes from the `HealthCheckActor/s`.
156173 check_receiver : ReceiverMpsc < NodeHealthState > ,
174+ /// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase).
157175 init_sender : SenderMpsc < bool > ,
176+ /// The cancellation token of the actor.
158177 token : CancellationToken ,
178+ /// The cancellation token for all the health checks.
159179 nodes_token : CancellationToken ,
180+ /// The task tracker of the health checks, waiting for the tasks to exit (graceful termination).
160181 nodes_tracker : TaskTracker ,
182+ /// The flag indicating if this actor is initialized with healthy nodes.
161183 is_initialized : bool ,
162184}
163185
164186impl < S > HealthManagerActor < S >
165187where
166188 S : RoutingSnapshot ,
167189{
168- ///
190+ /// Creates a new `HealthManagerActor` instance.
169191 pub fn new (
170192 checker : Arc < dyn HealthCheck > ,
171193 period : Duration ,
172- nodes_snapshot : GlobalShared < S > ,
194+ routing_snapshot : AtomicSwap < S > ,
173195 fetch_receiver : ReceiverWatch < FetchedNodes > ,
174196 init_sender : SenderMpsc < bool > ,
175197 token : CancellationToken ,
@@ -179,7 +201,7 @@ where
179201 Self {
180202 checker,
181203 period,
182- nodes_snapshot ,
204+ routing_snapshot ,
183205 fetch_receiver,
184206 check_sender,
185207 check_receiver,
@@ -191,11 +213,11 @@ where
191213 }
192214 }
193215
194- ///
216+ /// Runs the actor.
195217 pub async fn run ( mut self ) {
196218 loop {
197219 tokio:: select! {
198- // Check if a new array of fetched nodes appeared in the channel from NodesFetchService .
220+ // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel.
199221 result = self . fetch_receiver. changed( ) => {
200222 if let Err ( err) = result {
201223 error!( "{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}" ) ;
@@ -206,7 +228,7 @@ where
206228 let Some ( FetchedNodes { nodes } ) = self . fetch_receiver. borrow_and_update( ) . clone( ) else { continue } ;
207229 self . handle_fetch_update( nodes) . await ;
208230 }
209- // Receive health check messages from all running NodeHealthChecker /s.
231+ // Receive health check messages from all running HealthCheckActor /s.
210232 Some ( msg) = self . check_receiver. recv( ) => {
211233 self . handle_health_update( msg) . await ;
212234 }
@@ -221,13 +243,13 @@ where
221243 }
222244
223245 async fn handle_health_update ( & mut self , msg : NodeHealthState ) {
224- let current_snapshot = self . nodes_snapshot . load_full ( ) ;
246+ let current_snapshot = self . routing_snapshot . load_full ( ) ;
225247 let mut new_snapshot = ( * current_snapshot) . clone ( ) ;
226248 if let Err ( err) = new_snapshot. update_node ( & msg. node , msg. health . clone ( ) ) {
227249 error ! ( "{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}" ) ;
228250 return ;
229251 }
230- self . nodes_snapshot . store ( Arc :: new ( new_snapshot) ) ;
252+ self . routing_snapshot . store ( Arc :: new ( new_snapshot) ) ;
231253 if !self . is_initialized && msg. health . is_healthy ( ) {
232254 self . is_initialized = true ;
233255 // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure.
@@ -244,11 +266,11 @@ where
244266 return ;
245267 }
246268 debug ! ( "{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}" , nodes) ;
247- let current_snapshot = self . nodes_snapshot . load_full ( ) ;
269+ let current_snapshot = self . routing_snapshot . load_full ( ) ;
248270 let mut new_snapshot = ( * current_snapshot) . clone ( ) ;
249271 // If the snapshot has changed, store it and restart all node's health checks.
250272 if let Ok ( true ) = new_snapshot. sync_nodes ( & nodes) {
251- self . nodes_snapshot . store ( Arc :: new ( new_snapshot) ) ;
273+ self . routing_snapshot . store ( Arc :: new ( new_snapshot) ) ;
252274 self . stop_all_checks ( ) . await ;
253275 self . start_checks ( nodes. to_vec ( ) ) ;
254276 }
0 commit comments