Skip to content

Commit

Permalink
fix(rest): make readiness check atomic
Browse files Browse the repository at this point in the history
The readiness check API depends on the agent-core service's Node gRPC endpoint.
The current RwLock lets multiple tokio threads probe the agent-core. This is not
ideal. This change makes the get operation atomic, so that agent-core state cache
updates could be performed by only a single thread, and the updated cache should
feed the next few ready calls.

Signed-off-by: Niladri Halder <niladri.halder26@gmail.com>
  • Loading branch information
niladrih committed Dec 9, 2024
1 parent c7e1cbb commit 8013e2d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
1 change: 1 addition & 0 deletions control-plane/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rustls = { version = "0.23.19", default-features = false }
rustls-pemfile = "2.2.0"
actix-web = { version = "4.9.0", features = ["rustls-0_23"] }
actix-service = "2.0.2"
tokio = { version = "1.41.0", features = ["sync"] }
opentelemetry = { version = "0.26.0" }
tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] }
tracing = "0.1.40"
Expand Down
40 changes: 18 additions & 22 deletions control-plane/rest/service/src/health/core_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::v0::core_grpc;
use grpc::operations::node::traits::NodeOperations;
use std::{
sync::RwLock,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

/// This is a type to cache the liveness of the agent-core service.
/// This is meant to be wrapped inside an Arc and used across threads.
pub struct CachedCoreState {
state: RwLock<ServerState>,
state: Mutex<ServerState>,
cache_duration: Duration,
}

Expand All @@ -18,13 +16,23 @@ struct ServerState {
last_updated: Instant,
}

impl ServerState {
/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
async fn update_or_assume_unavailable(&mut self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
self.is_live = new_value;
self.last_updated = Instant::now();
}
}

impl CachedCoreState {
/// Create a new cache for serving readiness health checks based on agent-core health.
pub async fn new(cache_duration: Duration) -> Self {
let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false);

CachedCoreState {
state: RwLock::new(ServerState {
state: Mutex::new(ServerState {
is_live: agent_core_is_live,
last_updated: Instant::now(),
}),
Expand All @@ -35,24 +43,12 @@ impl CachedCoreState {
/// Get the cached state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn get_or_assume_unavailable(&self) -> bool {
let should_update = {
let state = self.state.read().unwrap();
state.last_updated.elapsed() >= self.cache_duration
};
let mut state = self.state.lock().await;

if should_update {
self.update_or_assume_unavailable().await;
if state.last_updated.elapsed() >= self.cache_duration {
state.update_or_assume_unavailable().await;
}

self.state.read().unwrap().is_live
}

/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn update_or_assume_unavailable(&self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
let mut state = self.state.write().unwrap();
state.is_live = new_value;
state.last_updated = Instant::now();
state.is_live
}
}

0 comments on commit 8013e2d

Please sign in to comment.