Skip to content

Commit 4abab20

Browse files
authored
refactor: Update inhibited instance removal logic (#1548)
1 parent 250ed73 commit 4abab20

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

lib/runtime/src/component/client.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct Client {
5858
// These are the remotes I know about from watching etcd
5959
pub instance_source: Arc<InstanceSource>,
6060
// These are the instances that are reported as down from sending rpc
61-
instance_inhibited: Arc<Mutex<HashMap<i64, u64>>>,
61+
instance_inhibited: Arc<Mutex<HashMap<i64, std::time::Instant>>>,
6262
}
6363

6464
#[derive(Clone, Debug)]
@@ -138,42 +138,42 @@ impl Client {
138138
pub async fn instances_avail(&self) -> Vec<Instance> {
139139
// TODO: Can we get the remaining TTL from the lease for the instance?
140140
const ETCD_LEASE_TTL: u64 = 10; // seconds
141-
let now = std::time::SystemTime::now()
142-
.duration_since(std::time::UNIX_EPOCH)
143-
.unwrap()
144-
.as_secs();
141+
let now = std::time::Instant::now();
145142

146143
let instances = self.instances();
147144
let mut inhibited = self.instance_inhibited.lock().await;
148145

149-
instances
146+
// 1. Remove inhibited instances that are no longer in `self.instances()`
147+
// 2. Remove inhibited instances that have expired
148+
// 3. Only return instances that are not inhibited after removals
149+
let mut new_inhibited = HashMap::<i64, std::time::Instant>::new();
150+
let filtered = instances
150151
.into_iter()
151152
.filter_map(|instance| {
152153
let id = instance.id();
153154
if let Some(&timestamp) = inhibited.get(&id) {
154-
// If the inhibition is stale, remove it and include the instance
155-
if now.saturating_sub(timestamp) > ETCD_LEASE_TTL {
155+
if now.duration_since(timestamp).as_secs() > ETCD_LEASE_TTL {
156156
tracing::debug!("instance {id} stale inhibition");
157-
inhibited.remove(&id);
158157
Some(instance)
159158
} else {
160159
tracing::debug!("instance {id} is inhibited");
160+
new_inhibited.insert(id, timestamp);
161161
None
162162
}
163163
} else {
164164
tracing::debug!("instance {id} not inhibited");
165165
Some(instance)
166166
}
167167
})
168-
.collect()
168+
.collect();
169+
170+
*inhibited = new_inhibited;
171+
filtered
169172
}
170173

171174
/// Mark an instance as down/unavailable
172175
pub async fn report_instance_down(&self, instance_id: i64) {
173-
let now = std::time::SystemTime::now()
174-
.duration_since(std::time::UNIX_EPOCH)
175-
.unwrap()
176-
.as_secs();
176+
let now = std::time::Instant::now();
177177

178178
let mut inhibited = self.instance_inhibited.lock().await;
179179
inhibited.insert(instance_id, now);

0 commit comments

Comments
 (0)