Skip to content

Commit 8bfe51b

Browse files
chore: add DynamicRouteProviderBuilder
1 parent 32bc62e commit 8bfe51b

File tree

1 file changed

+124
-95
lines changed

1 file changed

+124
-95
lines changed

ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs

Lines changed: 124 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use arc_swap::ArcSwap;
1010
use candid::Principal;
1111
use reqwest::Client;
1212
use tokio::{
13+
runtime::Handle,
1314
sync::{mpsc, watch},
1415
time::timeout,
1516
};
@@ -71,24 +72,19 @@ pub struct DynamicRouteProvider<S> {
7172
token: CancellationToken,
7273
}
7374

74-
impl<S> RouteProvider for DynamicRouteProvider<S>
75-
where
76-
S: RoutingSnapshot + 'static,
77-
{
78-
fn route(&self) -> Result<Url, AgentError> {
79-
let snapshot = self.routing_snapshot.load();
80-
let node = snapshot.next().ok_or_else(|| {
81-
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
82-
})?;
83-
Ok(node.to_routing_url())
84-
}
75+
/// A builder for the `DynamicRouteProvider`.
76+
pub struct DynamicRouteProviderBuilder<S> {
77+
fetcher: Arc<dyn Fetch>,
78+
fetch_period: Duration,
79+
fetch_retry_interval: Duration,
80+
checker: Arc<dyn HealthCheck>,
81+
check_period: Duration,
82+
routing_snapshot: AtomicSwap<S>,
83+
seeds: Vec<Node>,
8584
}
8685

87-
impl<S> DynamicRouteProvider<S>
88-
where
89-
S: RoutingSnapshot + 'static,
90-
{
91-
/// Creates a new instance of `DynamicRouteProvider`.
86+
impl<S> DynamicRouteProviderBuilder<S> {
87+
/// Creates a new instance of the builder.
9288
pub fn new(snapshot: S, seeds: Vec<Node>, http_client: Client) -> Self {
9389
let fetcher = Arc::new(NodesFetcher::new(
9490
http_client.clone(),
@@ -103,35 +99,75 @@ where
10399
check_period: HEALTH_CHECK_PERIOD,
104100
seeds,
105101
routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)),
106-
tracker: TaskTracker::new(),
107-
token: CancellationToken::new(),
108102
}
109103
}
110104

111-
/// Sets the fetcher for fetching the latest nodes topology.
105+
/// Sets the fetcher of the nodes in the topology.
112106
pub fn with_fetcher(mut self, fetcher: Arc<dyn Fetch>) -> Self {
113107
self.fetcher = fetcher;
114108
self
115109
}
116110

117-
/// Sets the periodicity of fetching the latest nodes topology.
111+
/// Sets the fetching periodicity.
118112
pub fn with_fetch_period(mut self, period: Duration) -> Self {
119113
self.fetch_period = period;
120114
self
121115
}
122116

123-
/// Sets the interval for retrying fetching the nodes in case of error.
117+
/// Sets the node health checker.
124118
pub fn with_checker(mut self, checker: Arc<dyn HealthCheck>) -> Self {
125119
self.checker = checker;
126120
self
127121
}
128122

129-
/// Sets the periodicity of checking the health of the nodes.
123+
/// Sets the periodicity of node health checking.
130124
pub fn with_check_period(mut self, period: Duration) -> Self {
131125
self.check_period = period;
132126
self
133127
}
134128

129+
/// Builds an instance of the `DynamicRouteProvider`.
130+
pub async fn build(self) -> DynamicRouteProvider<S>
131+
where
132+
S: RoutingSnapshot + 'static,
133+
{
134+
let route_provider = DynamicRouteProvider {
135+
fetcher: self.fetcher,
136+
fetch_period: self.fetch_period,
137+
fetch_retry_interval: self.fetch_retry_interval,
138+
checker: self.checker,
139+
check_period: self.check_period,
140+
routing_snapshot: self.routing_snapshot,
141+
tracker: TaskTracker::new(),
142+
seeds: self.seeds,
143+
token: CancellationToken::new(),
144+
};
145+
146+
if let Err(err) = route_provider.run().await {
147+
error!("{DYNAMIC_ROUTE_PROVIDER}: started in unhealthy state: {err:?}");
148+
}
149+
150+
route_provider
151+
}
152+
}
153+
154+
impl<S> RouteProvider for DynamicRouteProvider<S>
155+
where
156+
S: RoutingSnapshot + 'static,
157+
{
158+
fn route(&self) -> Result<Url, AgentError> {
159+
let snapshot = self.routing_snapshot.load();
160+
let node = snapshot.next().ok_or_else(|| {
161+
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
162+
})?;
163+
Ok(node.to_routing_url())
164+
}
165+
}
166+
167+
impl<S> DynamicRouteProvider<S>
168+
where
169+
S: RoutingSnapshot + 'static,
170+
{
135171
/// Starts two background tasks:
136172
/// - Task1: NodesFetchActor
137173
/// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor.
@@ -202,16 +238,26 @@ where
202238
);
203239

204240
(found_healthy_seeds).then_some(()).ok_or(anyhow!(
205-
"No healthy seeds found, they may become healthy later ..."
241+
"No healthy seeds found within {TIMEOUT_AWAIT_HEALTHY_SEED:?}, they may become healthy later ..."
206242
))
207243
}
244+
}
208245

209-
/// Kill all running tasks.
210-
pub async fn stop(&self) {
246+
// Gracefully stop the inner spawned tasks running in the background.
247+
impl<S> Drop for DynamicRouteProvider<S> {
248+
fn drop(&mut self) {
211249
self.token.cancel();
212250
self.tracker.close();
213-
self.tracker.wait().await;
214-
warn!("{DYNAMIC_ROUTE_PROVIDER}: gracefully stopped");
251+
let tracker = self.tracker.clone();
252+
// If no runtime is available do nothing.
253+
if let Ok(handle) = Handle::try_current() {
254+
let _ = handle.spawn(async move {
255+
tracker.wait().await;
256+
warn!("{DYNAMIC_ROUTE_PROVIDER}: stopped gracefully");
257+
});
258+
} else {
259+
error!("{DYNAMIC_ROUTE_PROVIDER}: no runtime available, cannot stop the spawned tasks");
260+
}
215261
}
216262
}
217263

@@ -228,7 +274,7 @@ mod tests {
228274
use crate::{
229275
agent::http_transport::{
230276
dynamic_routing::{
231-
dynamic_route_provider::{DynamicRouteProvider, IC0_SEED_DOMAIN},
277+
dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN},
232278
node::Node,
233279
snapshot::round_robin_routing::RoundRobinRoutingSnapshot,
234280
test_utils::{
@@ -266,14 +312,15 @@ mod tests {
266312
// Configure RouteProvider
267313
let snapshot = RoundRobinRoutingSnapshot::new();
268314
let client = Client::builder().build().unwrap();
269-
let route_provider = Arc::new(
270-
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
315+
let route_provider =
316+
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
271317
.with_fetcher(fetcher.clone())
272318
.with_checker(checker.clone())
273319
.with_fetch_period(fetch_interval)
274-
.with_check_period(check_interval),
275-
);
276-
route_provider.run().await.expect("no healthy seeds found");
320+
.with_check_period(check_interval)
321+
.build()
322+
.await;
323+
let route_provider = Arc::new(route_provider);
277324

278325
// This time span is required for the snapshot to be fully updated with the new nodes and their health info.
279326
let snapshot_update_duration = fetch_interval + 2 * check_interval;
@@ -341,9 +388,6 @@ mod tests {
341388
tokio::time::sleep(snapshot_update_duration).await;
342389
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
343390
assert_routed_domains(routed_domains, vec![node_2.domain()], 3);
344-
345-
// Teardown.
346-
route_provider.stop().await;
347391
}
348392

349393
#[tokio::test]
@@ -364,19 +408,18 @@ mod tests {
364408
// Configure RouteProvider
365409
let snapshot = RoundRobinRoutingSnapshot::new();
366410
let client = Client::builder().build().unwrap();
367-
let route_provider = Arc::new(
368-
DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client)
369-
.with_fetcher(fetcher.clone())
370-
.with_checker(checker.clone())
371-
.with_fetch_period(fetch_interval)
372-
.with_check_period(check_interval),
373-
);
374-
assert!(route_provider
375-
.run()
376-
.await
377-
.unwrap_err()
378-
.to_string()
379-
.contains("No healthy seeds found, they may become healthy later ..."));
411+
let route_provider = DynamicRouteProviderBuilder::new(
412+
snapshot,
413+
vec![node_1.clone(), node_2.clone()],
414+
client,
415+
)
416+
.with_fetcher(fetcher)
417+
.with_checker(checker.clone())
418+
.with_fetch_period(fetch_interval)
419+
.with_check_period(check_interval)
420+
.build()
421+
.await;
422+
let route_provider = Arc::new(route_provider);
380423

381424
// Test 1: calls to route() return an error, as no healthy seeds exist.
382425
for _ in 0..4 {
@@ -393,9 +436,6 @@ mod tests {
393436
tokio::time::sleep(3 * check_interval).await;
394437
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
395438
assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3);
396-
397-
// Teardown.
398-
route_provider.stop().await;
399439
}
400440

401441
#[tokio::test]
@@ -415,14 +455,15 @@ mod tests {
415455
// Configure RouteProvider
416456
let snapshot = RoundRobinRoutingSnapshot::new();
417457
let client = Client::builder().build().unwrap();
418-
let route_provider = Arc::new(
419-
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
420-
.with_fetcher(fetcher.clone())
458+
let route_provider =
459+
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
460+
.with_fetcher(fetcher)
421461
.with_checker(checker.clone())
422462
.with_fetch_period(fetch_interval)
423-
.with_check_period(check_interval),
424-
);
425-
route_provider.run().await.expect("no healthy seeds found");
463+
.with_check_period(check_interval)
464+
.build()
465+
.await;
466+
let route_provider = Arc::new(route_provider);
426467

427468
// Test 1: multiple route() calls return a single domain=ic0.app, as the seed is healthy.
428469
tokio::time::sleep(2 * check_interval).await;
@@ -439,9 +480,6 @@ mod tests {
439480
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
440481
);
441482
}
442-
443-
// Teardown.
444-
route_provider.stop().await;
445483
}
446484

447485
#[tokio::test]
@@ -461,19 +499,14 @@ mod tests {
461499
// Configure RouteProvider
462500
let snapshot = RoundRobinRoutingSnapshot::new();
463501
let client = Client::builder().build().unwrap();
464-
let route_provider = Arc::new(
465-
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
466-
.with_fetcher(fetcher.clone())
467-
.with_checker(checker.clone())
502+
let route_provider =
503+
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
504+
.with_fetcher(fetcher)
505+
.with_checker(checker)
468506
.with_fetch_period(fetch_interval)
469-
.with_check_period(check_interval),
470-
);
471-
assert!(route_provider
472-
.run()
473-
.await
474-
.unwrap_err()
475-
.to_string()
476-
.contains("No healthy seeds found, they may become healthy later ..."));
507+
.with_check_period(check_interval)
508+
.build()
509+
.await;
477510

478511
// Test: calls to route() return an error, as no healthy seeds exist.
479512
for _ in 0..4 {
@@ -484,9 +517,6 @@ mod tests {
484517
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
485518
);
486519
}
487-
488-
// Teardown.
489-
route_provider.stop().await;
490520
}
491521

492522
#[tokio::test]
@@ -507,14 +537,18 @@ mod tests {
507537
// Configure RouteProvider
508538
let snapshot = RoundRobinRoutingSnapshot::new();
509539
let client = Client::builder().build().unwrap();
510-
let route_provider = Arc::new(
511-
DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client)
512-
.with_fetcher(fetcher.clone())
513-
.with_checker(checker.clone())
514-
.with_fetch_period(fetch_interval)
515-
.with_check_period(check_interval),
516-
);
517-
route_provider.run().await.expect("no healthy seeds found");
540+
let route_provider = DynamicRouteProviderBuilder::new(
541+
snapshot,
542+
vec![node_1.clone(), node_2.clone()],
543+
client,
544+
)
545+
.with_fetcher(fetcher)
546+
.with_checker(checker.clone())
547+
.with_fetch_period(fetch_interval)
548+
.with_check_period(check_interval)
549+
.build()
550+
.await;
551+
let route_provider = Arc::new(route_provider);
518552

519553
// Test 1: calls to route() return only a healthy seed ic0.app.
520554
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
@@ -525,9 +559,6 @@ mod tests {
525559
tokio::time::sleep(2 * check_interval).await;
526560
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
527561
assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3);
528-
529-
// Teardown.
530-
route_provider.stop().await;
531562
}
532563

533564
#[tokio::test]
@@ -548,14 +579,15 @@ mod tests {
548579
// Configure RouteProvider
549580
let snapshot = RoundRobinRoutingSnapshot::new();
550581
let client = Client::builder().build().unwrap();
551-
let route_provider = Arc::new(
552-
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
582+
let route_provider =
583+
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
553584
.with_fetcher(fetcher.clone())
554585
.with_checker(checker.clone())
555586
.with_fetch_period(fetch_interval)
556-
.with_check_period(check_interval),
557-
);
558-
route_provider.run().await.expect("no healthy seeds found");
587+
.with_check_period(check_interval)
588+
.build()
589+
.await;
590+
let route_provider = Arc::new(route_provider);
559591

560592
// This time span is required for the snapshot to be fully updated with the new nodes topology and health info.
561593
let snapshot_update_duration = fetch_interval + 2 * check_interval;
@@ -579,8 +611,5 @@ mod tests {
579611
vec![node_1.domain(), node_2.domain(), node_3.domain()],
580612
2,
581613
);
582-
583-
// Teardown.
584-
route_provider.stop().await;
585614
}
586615
}

0 commit comments

Comments
 (0)