diff --git a/agent/src/rpc/synchronizer.rs b/agent/src/rpc/synchronizer.rs index d6b7db8be32..b797d3c730c 100644 --- a/agent/src/rpc/synchronizer.rs +++ b/agent/src/rpc/synchronizer.rs @@ -489,7 +489,7 @@ pub struct Synchronizer { pub agent_id: Arc>, pub status: Arc>, - agent_state: AgentState, + agent_state: Arc, session: Arc, // 策略模块和NPB带宽检测会用到 @@ -515,7 +515,7 @@ impl Synchronizer { pub fn new( runtime: Arc, session: Arc, - agent_state: AgentState, + agent_state: Arc, version_info: &'static VersionInfo, agent_id: AgentId, controller_ip: String, @@ -877,23 +877,13 @@ impl Synchronizer { status_guard.first = false; drop(status_guard); - let (agent_state, cvar) = &**agent_state; - if !user_config.global.common.enabled - || exception_handler.has(Exception::SystemLoadCircuitBreaker) - || exception_handler.has(Exception::FreeMemExceeded) - { - *agent_state.lock().unwrap() = - trident::State::Disabled(Some((user_config, resp.dynamic_config.unwrap()))); - } else { - *agent_state.lock().unwrap() = trident::State::ConfigChanged(ChangedConfig { - user_config, - blacklist, - vm_mac_addrs: macs, - gateway_vmac_addrs, - tap_types: resp.capture_network_types, - }); - } - cvar.notify_one(); + agent_state.update_config(ChangedConfig { + user_config, + blacklist, + vm_mac_addrs: macs, + gateway_vmac_addrs, + tap_types: resp.capture_network_types, + }); } fn grpc_failed_log(grpc_failed_count: &mut usize, detail: String) { @@ -1031,9 +1021,7 @@ impl Synchronizer { // channel closed Ok(None) => return, Err(_) => { - let (ts, cvar) = &*agent_state; - *ts.lock().unwrap() = trident::State::Disabled(None); - cvar.notify_one(); + agent_state.disable(); warn!("as max escape time expired, deepflow-agent restart..."); // 与控制器失联的时间超过设置的逃逸时间,这里直接重启主要有两个原因: // 1. 如果仅是停用系统无法回收全部的内存资源 @@ -1437,17 +1425,10 @@ impl Synchronizer { max_memory.store(user_config.global.limits.max_memory, Ordering::Relaxed); let new_sync_interval = user_config.global.communication.proactive_request_interval; - let (agent_state, cvar) = &*agent_state; - if !user_config.global.common.enabled { - *agent_state.lock().unwrap() = - trident::State::Disabled(Some((user_config, dynamic_config))); - } else { - *agent_state.lock().unwrap() = trident::State::ConfigChanged(ChangedConfig { - user_config, - ..Default::default() - }); - } - cvar.notify_one(); + agent_state.update_config(ChangedConfig { + user_config, + ..Default::default() + }); if sync_interval != new_sync_interval { sync_interval = new_sync_interval; @@ -1569,9 +1550,7 @@ impl Synchronizer { } else { match Self::upgrade(&running, &session, &revision, &id).await { Ok(_) => { - let (ts, cvar) = &*agent_state; - *ts.lock().unwrap() = trident::State::Terminated; - cvar.notify_one(); + agent_state.terminate(); warn!("agent upgrade is successful and restarts normally, deepflow-agent restart..."); crate::utils::notify_exit(NORMAL_EXIT_WITH_RESTART); return; diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 37009f61545..62be2c3e90b 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -128,7 +128,7 @@ use public::{ buffer::BatchedBox, debug::QueueDebugger, packet::MiniPacket, - proto::agent::{self, DynamicConfig, Exception, PacketCaptureType, SocketType}, + proto::agent::{self, Exception, PacketCaptureType, SocketType}, queue::{self, DebugSender}, utils::net::{get_route_src_ip, Link, MacAddr}, LeakyBucket, @@ -156,20 +156,121 @@ pub enum RunningMode { Standalone, } -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] +struct InnerState { + enabled: bool, + melted_down: bool, +} + +impl Default for InnerState { + fn default() -> Self { + Self { + enabled: false, + melted_down: true, + } + } +} + +impl From for State { + fn from(state: InnerState) -> Self { + if state.enabled && !state.melted_down { + State::Running + } else { + State::Disabled + } + } +} + +#[derive(Debug, PartialEq, Eq)] pub enum State { Running, - ConfigChanged(ChangedConfig), Terminated, - Disabled(Option<(UserConfig, DynamicConfig)>), // Requires user config and dynamic config to update platform config + Disabled, } -impl State { - fn unwrap_config(self) -> ChangedConfig { - match self { - Self::ConfigChanged(c) => c, - _ => panic!("{:?} not config type", &self), +#[derive(Default)] +pub struct AgentState { + // terminated is outside of Mutex because during termination, state will be locked in main thread, + // and the main thread will try to stop other threads, in which may lock and update agent state, + // causing a deadlock. Checking terminated state before locking inner state will avoid this deadlock. + terminated: AtomicBool, + state: Mutex<(InnerState, Option)>, + notifier: Condvar, +} + +impl AgentState { + pub fn enable(&self) { + if self.terminated.load(Ordering::Relaxed) { + return; + } + let mut sg = self.state.lock().unwrap(); + let old_state: State = sg.0.into(); + sg.0.enabled = true; + let new_state: State = sg.0.into(); + if old_state != new_state { + info!("Agent state changed from {old_state:?} to {new_state:?} (enabled: {} melted_down: {})", sg.0.enabled, sg.0.melted_down); + self.notifier.notify_one(); + } + } + + pub fn disable(&self) { + if self.terminated.load(Ordering::Relaxed) { + return; + } + let mut sg = self.state.lock().unwrap(); + let old_state: State = sg.0.into(); + sg.0.enabled = false; + let new_state: State = sg.0.into(); + if old_state != new_state { + info!("Agent state changed from {old_state:?} to {new_state:?} (enabled: {} melted_down: {})", sg.0.enabled, sg.0.melted_down); + self.notifier.notify_one(); + } + } + + pub fn melt_down(&self) { + if self.terminated.load(Ordering::Relaxed) { + return; + } + let mut sg = self.state.lock().unwrap(); + let old_state: State = sg.0.into(); + sg.0.melted_down = true; + let new_state: State = sg.0.into(); + if old_state != new_state { + info!("Agent state changed from {old_state:?} to {new_state:?} (enabled: {} melted_down: {})", sg.0.enabled, sg.0.melted_down); + self.notifier.notify_one(); + } + } + + pub fn recover(&self) { + if self.terminated.load(Ordering::Relaxed) { + return; + } + let mut sg = self.state.lock().unwrap(); + let old_state: State = sg.0.into(); + sg.0.melted_down = false; + let new_state: State = sg.0.into(); + if old_state != new_state { + info!("Agent state changed from {old_state:?} to {new_state:?} (enabled: {} melted_down: {})", sg.0.enabled, sg.0.melted_down); + self.notifier.notify_one(); + } + } + + pub fn terminate(&self) { + if !self.terminated.swap(true, Ordering::Relaxed) { + // log only the first time + info!("Agent state changed to {:?}", State::Terminated); + } + self.notifier.notify_one(); + } + + pub fn update_config(&self, config: ChangedConfig) { + if self.terminated.load(Ordering::Relaxed) { + return; } + let mut sg = self.state.lock().unwrap(); + sg.0.enabled = config.user_config.global.common.enabled; + sg.1.replace(config); + self.notifier.notify_one(); } } @@ -184,6 +285,21 @@ pub struct VersionInfo { pub revision: &'static str, } +impl VersionInfo { + pub fn brief_tag(&self) -> String { + format!( + "{}|{}|{}", + match self.name { + "deepflow-agent-ce" => "CE", + "deepflow-agent-ee" => "EE", + _ => panic!("{:?} unknown deepflow-agent edition", &self.name), + }, + self.branch, + self.commit_id + ) + } +} + impl fmt::Display for VersionInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( @@ -211,8 +327,6 @@ CompileTime: {}", } } -pub type AgentState = Arc<(Mutex, Condvar)>; - #[derive(Clone, Debug)] pub struct AgentId { pub ip: IpAddr, @@ -281,7 +395,7 @@ impl SenderEncoder { } } pub struct Trident { - state: AgentState, + state: Arc, handle: Option>, #[cfg(target_os = "linux")] pid_file: Option, @@ -441,7 +555,7 @@ impl Trident { ); info!("static_config {:#?}", config); - let state = Arc::new((Mutex::new(State::Running), Condvar::new())); + let state = Arc::new(AgentState::default()); let state_thread = state.clone(); let config_path = match agent_mode { RunningMode::Managed => None, @@ -478,7 +592,7 @@ impl Trident { } fn run( - state: AgentState, + state: Arc, ctrl_ip: IpAddr, ctrl_mac: MacAddr, mut config_handler: ConfigHandler, @@ -491,6 +605,7 @@ impl Trident { ntp_diff: Arc, ) -> Result<()> { info!("==================== Launching DeepFlow-Agent ===================="); + info!("Brief tag: {}", version_info.brief_tag()); info!("Environment variables: {:?}", get_env()); if running_in_container() { @@ -660,6 +775,7 @@ impl Trident { let log_dir = log_dir.parent().unwrap().to_str().unwrap(); let guard = match Guard::new( config_handler.environment(), + state.clone(), log_dir.to_string(), exception_handler.clone(), cgroup_mount_path, @@ -678,7 +794,6 @@ impl Trident { return Err(anyhow!(e)); } }; - guard.start(); let monitor = Monitor::new( stats_collector.clone(), @@ -739,15 +854,36 @@ impl Trident { platform_synchronizer.start(); } - let (state, cond) = &*state; - let mut state_guard = state.lock().unwrap(); + let mut state_guard = state.state.lock().unwrap(); let mut components: Option = None; let mut first_run = true; + let mut config_initialized = false; loop { - match &mut *state_guard { - State::Running => { - state_guard = cond.wait(state_guard).unwrap(); + if state.terminated.load(Ordering::Relaxed) { + if let Some(mut c) = components { + c.stop(); + guard.stop(); + monitor.stop(); + domain_name_listener.stop(); + platform_synchronizer.stop(); + #[cfg(target_os = "linux")] + { + api_watcher.stop(); + libvirt_xml_extractor.stop(); + } + if let Some(cg_controller) = cgroups_controller { + if let Err(e) = cg_controller.stop() { + info!("stop cgroups controller failed, {:?}", e); + } + } + } + return Ok(()); + } + + match State::from(state_guard.0) { + State::Running if state_guard.1.is_none() => { + state_guard = state.notifier.wait(state_guard).unwrap(); #[cfg(target_os = "linux")] if config_handler .candidate_config @@ -758,36 +894,19 @@ impl Trident { } else { api_watcher.stop(); } - continue; - } - State::Terminated => { - if let Some(mut c) = components { - c.stop(); - guard.stop(); - monitor.stop(); - domain_name_listener.stop(); - platform_synchronizer.stop(); - #[cfg(target_os = "linux")] - { - api_watcher.stop(); - libvirt_xml_extractor.stop(); - } - if let Some(cg_controller) = cgroups_controller { - if let Err(e) = cg_controller.stop() { - info!("stop cgroups controller failed, {:?}", e); - } - } + if let Some(ref mut c) = components { + c.start(); } - return Ok(()); + continue; } - State::Disabled(config) => { + State::Disabled => { if let Some(ref mut c) = components { c.stop(); } - if let Some(c) = config.take() { + if let Some(cfg) = state_guard.1.take() { let agent_id = synchronizer.agent_id.read().clone(); let callbacks = config_handler.on_config( - c.0, + cfg.user_config, &exception_handler, None, #[cfg(target_os = "linux")] @@ -825,15 +944,19 @@ impl Trident { stats_collector .set_min_interval(config_handler.candidate_config.stats.interval); } + + if !config_initialized { + // start guard on receiving first config to ensure + // the meltdown thresholds are set by the config + guard.start(); + config_initialized = true; + } } - state_guard = cond.wait(state_guard).unwrap(); + state_guard = state.notifier.wait(state_guard).unwrap(); continue; } _ => (), } - let mut new_state = State::Running; - mem::swap(&mut new_state, &mut *state_guard); - mem::drop(state_guard); let ChangedConfig { user_config, @@ -841,7 +964,8 @@ impl Trident { vm_mac_addrs, gateway_vmac_addrs, tap_types, - } = new_state.unwrap_config(); + } = state_guard.1.take().unwrap(); + mem::drop(state_guard); // TODO At present, all changes in user_config will not cause the agent to restart, // hot update needs to be implemented and this judgment should be removed @@ -997,18 +1121,21 @@ impl Trident { } } } - state_guard = state.lock().unwrap(); + + if !config_initialized { + // start guard on receiving first config to ensure + // the meltdown thresholds are set by the config + guard.start(); + config_initialized = true; + } + + state_guard = state.state.lock().unwrap(); } } pub fn stop(&mut self) { info!("Gracefully stopping"); - let (state, cond) = &*self.state; - - let mut state_guard = state.lock().unwrap(); - *state_guard = State::Terminated; - cond.notify_one(); - mem::drop(state_guard); + self.state.terminate(); self.handle.take().unwrap().join().unwrap(); info!("Gracefully stopped"); } diff --git a/agent/src/utils/guard.rs b/agent/src/utils/guard.rs index af74ed09752..be5a45503b7 100644 --- a/agent/src/utils/guard.rs +++ b/agent/src/utils/guard.rs @@ -43,6 +43,7 @@ use crate::common::{ use crate::config::handler::EnvironmentAccess; use crate::exception::ExceptionHandler; use crate::rpc::get_timestamp; +use crate::trident::AgentState; use crate::utils::{cgroups::is_kernel_available_for_cgroups, environment::running_in_container}; use public::proto::agent::{Exception, PacketCaptureType, SysMemoryMetric, SystemLoadMetric}; @@ -131,6 +132,7 @@ impl SystemLoadGuard { pub struct Guard { config: EnvironmentAccess, + state: Arc, log_dir: String, thread: Mutex>>, running: Arc<(Mutex, Condvar)>, @@ -146,6 +148,7 @@ pub struct Guard { impl Guard { pub fn new( config: EnvironmentAccess, + state: Arc, log_dir: String, exception_handler: ExceptionHandler, cgroup_mount_path: String, @@ -158,6 +161,7 @@ impl Guard { }; Ok(Self { config, + state, log_dir, thread: Mutex::new(None), running: Arc::new((Mutex::new(false), Condvar::new())), @@ -322,16 +326,17 @@ impl Guard { pub fn start(&self) { { - let (started, _) = &*self.running; - let mut started = started.lock().unwrap(); - if *started { + let (running, _) = &*self.running; + let mut running = running.lock().unwrap(); + if *running { return; } - *started = true; + *running = true; } let config = self.config.clone(); - let running = self.running.clone(); + let running_state = self.running.clone(); + let state = self.state.clone(); let exception_handler = self.exception_handler.clone(); let log_dir = self.log_dir.clone(); let mut over_memory_limit = false; // Higher than the limit does not meet expectations @@ -486,13 +491,19 @@ impl Guard { } } - let (running, timer) = &*running; - let mut running = running.lock().unwrap(); - if !*running { + if exception_handler.has(Exception::SystemLoadCircuitBreaker) || exception_handler.has(Exception::FreeMemExceeded) { + state.melt_down(); + } else { + state.recover(); + } + + let (running, notifier) = &*running_state; + let mut rg = running.lock().unwrap(); + if !*rg { break; } - running = timer.wait_timeout(running, config.guard_interval).unwrap().0; - if !*running { + rg = notifier.wait_timeout(rg, config.guard_interval).unwrap().0; + if !*rg { break; } } @@ -504,15 +515,15 @@ impl Guard { } pub fn stop(&self) { - let (stopped, timer) = &*self.running; + let (running, notifier) = &*self.running; { - let mut stopped = stopped.lock().unwrap(); - if !*stopped { + let mut running = running.lock().unwrap(); + if !*running { return; } - *stopped = false; + *running = false; } - timer.notify_one(); + notifier.notify_one(); if let Some(thread) = self.thread.lock().unwrap().take() { let _ = thread.join();