Skip to content

Commit 993c127

Browse files
committed
Improve client tracking logic on downed instance
1 parent 7c9c52f commit 993c127

File tree

1 file changed

+51
-88
lines changed

1 file changed

+51
-88
lines changed

lib/runtime/src/component/client.rs

Lines changed: 51 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,8 @@ use crate::pipeline::{
66
SingleIn,
77
};
88
use arc_swap::ArcSwap;
9-
use rand::Rng;
109
use std::collections::HashMap;
11-
use std::sync::RwLock;
12-
use std::sync::{
13-
atomic::{AtomicU64, Ordering},
14-
Arc, Mutex,
15-
};
16-
use std::time::Instant;
10+
use std::sync::Arc;
1711
use tokio::net::unix::pipe::Receiver;
1812

1913
use crate::{
@@ -48,10 +42,8 @@ pub struct Client {
4842
pub endpoint: Endpoint,
4943
// These are the remotes I know about from watching etcd
5044
pub instance_source: Arc<InstanceSource>,
51-
// These are the instances that are reported as down from sending rpc
52-
instance_inhibited: Arc<Mutex<HashMap<i64, Instant>>>,
53-
// The current active IDs
54-
instance_cache: Arc<ArcSwap<Vec<i64>>>,
45+
// These are the instance source ids less those reported as down from sending rpc
46+
instance_avail: Arc<ArcSwap<Vec<i64>>>,
5547
}
5648

5749
#[derive(Clone, Debug)]
@@ -60,16 +52,13 @@ pub enum InstanceSource {
6052
Dynamic(tokio::sync::watch::Receiver<Vec<Instance>>),
6153
}
6254

63-
// TODO: Avoid returning a full clone of `Vec<Instance>` everytime from Client
64-
// See instances() and instances_avail() methods
6555
impl Client {
6656
// Client will only talk to a single static endpoint
6757
pub(crate) async fn new_static(endpoint: Endpoint) -> Result<Self> {
6858
Ok(Client {
6959
endpoint,
7060
instance_source: Arc::new(InstanceSource::Static),
71-
instance_inhibited: Arc::new(Mutex::new(HashMap::new())),
72-
instance_cache: Arc::new(ArcSwap::from(Arc::new(vec![]))),
61+
instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
7362
})
7463
}
7564

@@ -85,26 +74,12 @@ impl Client {
8574
let instance_source =
8675
Self::get_or_create_dynamic_instance_source(etcd_client, &endpoint).await?;
8776

88-
let cancel_token = endpoint.drt().primary_token();
8977
let client = Client {
9078
endpoint,
9179
instance_source,
92-
instance_inhibited: Arc::new(Mutex::new(HashMap::new())),
93-
instance_cache: Arc::new(ArcSwap::from(Arc::new(vec![]))),
80+
instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
9481
};
95-
96-
let instance_source_c = client.instance_source.clone();
97-
let instance_inhibited_c = Arc::clone(&client.instance_inhibited);
98-
let instance_cache_c = Arc::clone(&client.instance_cache);
99-
tokio::task::spawn(async move {
100-
while !cancel_token.is_cancelled() {
101-
refresh_instances(&instance_source_c, &instance_inhibited_c, &instance_cache_c);
102-
tokio::select! {
103-
_ = cancel_token.cancelled() => {}
104-
_ = tokio::time::sleep(INSTANCE_REFRESH_PERIOD) => {}
105-
}
106-
}
107-
});
82+
client.monitor_instance_source();
10883
Ok(client)
10984
}
11085

@@ -119,13 +94,20 @@ impl Client {
11994

12095
/// Instances available from watching etcd
12196
pub fn instances(&self) -> Vec<Instance> {
122-
instances_inner(self.instance_source.as_ref())
97+
match self.instance_source.as_ref() {
98+
InstanceSource::Static => vec![],
99+
InstanceSource::Dynamic(watch_rx) => watch_rx.borrow().clone(),
100+
}
123101
}
124102

125103
pub fn instance_ids(&self) -> Vec<i64> {
126104
self.instances().into_iter().map(|ep| ep.id()).collect()
127105
}
128106

107+
pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<i64>>> {
108+
self.instance_avail.load()
109+
}
110+
129111
/// Wait for at least one Instance to be available for this Endpoint
130112
pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
131113
let mut instances: Vec<Instance> = vec![];
@@ -143,24 +125,51 @@ impl Client {
143125
Ok(instances)
144126
}
145127

146-
/// Instances available from watching etcd minus those reported as down
147-
pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<i64>>> {
148-
self.instance_cache.load()
128+
/// Is this component know at startup and not discovered via etcd?
129+
pub fn is_static(&self) -> bool {
130+
matches!(self.instance_source.as_ref(), InstanceSource::Static)
149131
}
150132

151133
/// Mark an instance as down/unavailable
152134
pub fn report_instance_down(&self, instance_id: i64) {
153-
self.instance_inhibited
154-
.lock()
155-
.unwrap()
156-
.insert(instance_id, Instant::now());
135+
let filtered = self
136+
.instance_ids_avail()
137+
.iter()
138+
.filter_map(|&id| if id == instance_id { None } else { Some(id) })
139+
.collect::<Vec<_>>();
140+
self.instance_avail.store(Arc::new(filtered));
157141

158142
tracing::debug!("inhibiting instance {instance_id}");
159143
}
160144

161-
/// Is this component know at startup and not discovered via etcd?
162-
pub fn is_static(&self) -> bool {
163-
matches!(self.instance_source.as_ref(), InstanceSource::Static)
145+
/// Monitor the ETCD instance source and update instance_avail.
146+
fn monitor_instance_source(&self) {
147+
let cancel_token = self.endpoint.drt().primary_token();
148+
let client = self.clone();
149+
tokio::task::spawn(async move {
150+
let mut rx = match client.instance_source.as_ref() {
151+
InstanceSource::Static => {
152+
tracing::error!("Static instance source is not watchable");
153+
return;
154+
}
155+
InstanceSource::Dynamic(rx) => rx.clone(),
156+
};
157+
while !cancel_token.is_cancelled() {
158+
let instance_ids: Vec<i64> = rx
159+
.borrow_and_update()
160+
.iter()
161+
.map(|instance| instance.id())
162+
.collect();
163+
client.instance_avail.store(Arc::new(instance_ids));
164+
165+
tracing::debug!("instance source updated");
166+
167+
if let Err(err) = rx.changed().await {
168+
tracing::error!("The Sender is dropped: {}", err);
169+
cancel_token.cancel();
170+
}
171+
}
172+
});
164173
}
165174

166175
async fn get_or_create_dynamic_instance_source(
@@ -253,49 +262,3 @@ impl Client {
253262
Ok(instance_source)
254263
}
255264
}
256-
257-
/// Update the instance id cache
258-
fn refresh_instances(
259-
instance_source: &InstanceSource,
260-
instance_inhibited: &Arc<Mutex<HashMap<i64, Instant>>>,
261-
instance_cache: &Arc<ArcSwap<Vec<i64>>>,
262-
) {
263-
const ETCD_LEASE_TTL: u64 = 10; // seconds
264-
265-
// TODO: Can we get the remaining TTL from the lease for the instance?
266-
let now = Instant::now();
267-
268-
let instances = instances_inner(instance_source);
269-
let mut inhibited = instance_inhibited.lock().unwrap();
270-
271-
// 1. Remove inhibited instances that are no longer in `self.instances()`
272-
// 2. Remove inhibited instances that have expired
273-
// 3. Only return instances that are not inhibited after removals
274-
let mut new_inhibited = HashMap::<i64, Instant>::new();
275-
let filtered: Vec<i64> = instances
276-
.into_iter()
277-
.filter_map(|instance| {
278-
let id = instance.id();
279-
if let Some(&timestamp) = inhibited.get(&id) {
280-
if now.duration_since(timestamp).as_secs() > ETCD_LEASE_TTL {
281-
Some(id)
282-
} else {
283-
new_inhibited.insert(id, timestamp);
284-
None
285-
}
286-
} else {
287-
Some(id)
288-
}
289-
})
290-
.collect();
291-
292-
*inhibited = new_inhibited;
293-
instance_cache.store(Arc::new(filtered));
294-
}
295-
296-
fn instances_inner(instance_source: &InstanceSource) -> Vec<Instance> {
297-
match instance_source {
298-
InstanceSource::Static => vec![],
299-
InstanceSource::Dynamic(watch_rx) => watch_rx.borrow().clone(),
300-
}
301-
}

0 commit comments

Comments
 (0)