From 7d2360d5f03b92d61756ac3115fe271859b56d93 Mon Sep 17 00:00:00 2001 From: Nicolas Belouin Date: Tue, 23 May 2023 10:56:32 +0200 Subject: [PATCH] agent: Fix a stop discovery channel issue The stop discovery channel is shared by all Configuration resources of a given discovery handler endpoint, this channel was used without giving any context of the targeted DiscoveryOperator, leading to a stop of discovery for all Configurations when a Configuration is modified or deleted. This commit makes the stop discovery channel send the concerned Configuration ID (namespace + name) on the channel, allowing to filter out requests targeting another DiscoveryOperator, the channel is still shared so the cases where we really want to stop discovery for all Configurations still works. Signed-off-by: Nicolas Belouin --- agent/src/util/config_action.rs | 2 +- agent/src/util/discovery_operator.rs | 169 +++++++++++++++++++++++---- agent/src/util/registration.rs | 8 +- 3 files changed, 152 insertions(+), 27 deletions(-) diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index 237a0a8a5..dc286d5fb 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -24,7 +24,7 @@ use std::{ }; use tokio::sync::{broadcast, mpsc, RwLock}; -type ConfigId = (String, String); +pub type ConfigId = (String, String); type ConfigMap = Arc>>; /// Information for managing a Configuration, such as all applied Instances of that Configuration diff --git a/agent/src/util/discovery_operator.rs b/agent/src/util/discovery_operator.rs index 63c552e17..fe29e51b7 100644 --- a/agent/src/util/discovery_operator.rs +++ b/agent/src/util/discovery_operator.rs @@ -2,6 +2,7 @@ use super::super::INSTANCE_COUNT_METRIC; #[cfg(any(test, feature = "agent-full"))] use super::embedded_discovery_handlers::get_discovery_handler; use super::{ + config_action::ConfigId, constants::SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS, device_plugin_builder::{DevicePluginBuilder, DevicePluginBuilderInterface}, device_plugin_service, @@ -73,6 +74,13 @@ impl DiscoveryOperator { instance_map, } } + fn get_config_id(&self) -> ConfigId { + ( + self.config.metadata.namespace.clone().unwrap(), + self.config.metadata.name.clone().unwrap(), + ) + } + /// Returns discovery_handler_map field. Allows the struct to be mocked. #[allow(dead_code)] pub fn get_discovery_handler_map(&self) -> RegisteredDiscoveryHandlerMap { @@ -95,7 +103,8 @@ impl DiscoveryOperator { discovery_handler_map.get_mut(&self.config.spec.discovery_handler.name) { for (endpoint, dh_details) in discovery_handler_details_map.clone() { - match dh_details.close_discovery_handler_connection.send(()) { + // Send with the config_id so we stop discovery for this Configuration only. + match dh_details.close_discovery_handler_connection.send(Some(self.get_config_id())) { Ok(_) => trace!("stop_all_discovery - discovery client for {} discovery handler at endpoint {:?} told to stop", self.config.spec.discovery_handler.name, endpoint), Err(e) => error!("stop_all_discovery - discovery client {} discovery handler at endpoint {:?} could not receive stop message with error {:?}", self.config.spec.discovery_handler.name, endpoint, e) } @@ -206,13 +215,26 @@ impl DiscoveryOperator { ) -> anyhow::Result<()> { // clone objects for thread let discovery_operator = Arc::new(self.clone()); - let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<()> = + let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver> = &mut dh_details.close_discovery_handler_connection.subscribe(); loop { // Wait for either new discovery results or a message to stop discovery tokio::select! { - _ = stop_discovery_receiver.recv() => { - trace!("internal_do_discover - received message to stop discovery for endpoint {:?} serving protocol {}", dh_details.endpoint, discovery_operator.get_config().spec.discovery_handler.name); + result = stop_discovery_receiver.recv() => { + // Stop is triggered if the current config_id (to only stop this task) or None (to stop all tasks of this handler) is sent. + if let Ok(Some(config_id)) = result { + if config_id != self.get_config_id() { + trace!("internal_do_discover - received message to stop discovery for another configuration, ignoring it."); + continue; + } + } + trace!( + "internal_do_discover({}::{}) - received message to stop discovery for endpoint {:?} serving protocol {}", + self.config.metadata.namespace.as_ref().unwrap(), + self.config.metadata.name.as_ref().unwrap(), + dh_details.endpoint, + discovery_operator.get_config().spec.discovery_handler.name, + ); break; }, result = stream.get_message() => { @@ -689,11 +711,22 @@ pub mod start_discovery { sleep_duration = Duration::from_millis(100); } - if tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()) - .await - .is_ok() + if let Ok(result) = + tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()).await { - trace!("do_discover_on_discovery_handler - received message to stop discovery for {} Discovery Handler at endpoint {:?}", dh_details.name, dh_details.endpoint); + // Stop is triggered if the current config_id (to only stop this task) or None (to stop all tasks of this handler) is sent. + if let Ok(Some(config_id)) = result { + if config_id != discovery_operator.get_config_id() { + trace!("do_discover_on_discovery_handler - received message to stop discovery for another configuration, ignoring it."); + continue; + } + } + let (config_namespace, config_name) = discovery_operator.get_config_id(); + trace!( + "do_discover_on_discovery_handler({}::{}) - received message to stop discovery for {} Discovery Handler at endpoint {:?}", + config_namespace, config_name, + dh_details.name, dh_details.endpoint, + ); break; } } @@ -820,6 +853,10 @@ pub mod tests { let ctx = MockDiscoveryOperator::new_context(); let discovery_handler_map_clone = discovery_handler_map.clone(); let config_clone = config.clone(); + let config_id = ( + config.metadata.namespace.clone().unwrap(), + config.metadata.namespace.clone().unwrap(), + ); let instance_map_clone = instance_map.clone(); ctx.expect().return_once(move |_, _, _| { // let mut discovery_handler_status_seq = Sequence::new(); @@ -830,6 +867,8 @@ pub mod tests { .returning(move || config_clone.clone()); mock.expect_get_instance_map() .returning(move || instance_map_clone.clone()); + mock.expect_get_config_id() + .returning(move || config_id.clone()); mock }); MockDiscoveryOperator::new(discovery_handler_map, config, instance_map) @@ -847,7 +886,9 @@ pub mod tests { // Add discovery handler to registered discovery handler map let dh_details_map = match registered_dh_map.lock().unwrap().clone().get_mut(dh_name) { Some(dh_details_map) => { - dh_details_map.insert(endpoint.clone(), discovery_handler_details); + if !dh_details_map.contains_key(endpoint) { + dh_details_map.insert(endpoint.clone(), discovery_handler_details); + } dh_details_map.clone() } None => { @@ -878,8 +919,8 @@ pub mod tests { fn setup_test_do_discover( config_name: &str, - ) -> (MockDiscoveryOperator, RegisteredDiscoveryHandlerMap) { - let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); + discovery_handler_map: RegisteredDiscoveryHandlerMap, + ) -> MockDiscoveryOperator { add_discovery_handler_to_map( "debugEcho", &DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()), @@ -892,12 +933,12 @@ pub mod tests { let config_yaml = std::fs::read_to_string(path_to_config).expect("Unable to read file"); let mut config: Configuration = serde_yaml::from_str(&config_yaml).unwrap(); config.metadata.name = Some(config_name.to_string()); - let discovery_operator = create_mock_discovery_operator( - discovery_handler_map.clone(), + config.metadata.namespace = Some(config_name.to_string()); + create_mock_discovery_operator( + discovery_handler_map, config, Arc::new(tokio::sync::RwLock::new(HashMap::new())), - ); - (discovery_operator, discovery_handler_map) + ) } #[test] @@ -923,6 +964,73 @@ pub mod tests { assert_eq!(first_shared_video_digest, second_shared_video_digest); } + #[tokio::test] + async fn test_internal_do_discover_stop_one() { + let mock_kube_interface1: Arc = Arc::new(MockKubeInterface::new()); + let mock_kube_interface2 = mock_kube_interface1.clone(); + let dh_name = "debugEcho"; + let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); + let endpoint = DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()); + add_discovery_handler_to_map(dh_name, &endpoint, false, discovery_handler_map.clone()); + let dh_details1 = discovery_handler_map + .lock() + .unwrap() + .get(dh_name) + .unwrap() + .get(&endpoint) + .unwrap() + .clone(); + let dh_details2 = dh_details1.clone(); + + let (_tx1, mut rx1) = mpsc::channel(2); + let (_tx2, mut rx2) = mpsc::channel(2); + + let config1: Configuration = serde_yaml::from_str( + std::fs::read_to_string("../test/yaml/config-a.yaml") + .expect("Unable to read file") + .as_str(), + ) + .unwrap(); + let discovery_operator1 = Arc::new(DiscoveryOperator::new( + discovery_handler_map.clone(), + config1, + Arc::new(tokio::sync::RwLock::new(HashMap::new())), + )); + let local_do1 = discovery_operator1.clone(); + let discover1 = tokio::spawn(async move { + discovery_operator1 + .internal_do_discover(mock_kube_interface1, &dh_details1, &mut rx1) + .await + .unwrap() + }); + + let config2: Configuration = serde_yaml::from_str( + std::fs::read_to_string("../test/yaml/config-b.yaml") + .expect("Unable to read file") + .as_str(), + ) + .unwrap(); + let discovery_operator2 = Arc::new(DiscoveryOperator::new( + discovery_handler_map, + config2, + Arc::new(tokio::sync::RwLock::new(HashMap::new())), + )); + let discover2 = tokio::spawn(async move { + discovery_operator2 + .internal_do_discover(mock_kube_interface2, &dh_details2, &mut rx2) + .await + .unwrap() + }); + tokio::time::sleep(Duration::from_millis(100)).await; // Make sure they had time to launch + local_do1.stop_all_discovery().await; + assert!(tokio::time::timeout(Duration::from_millis(100), discover1) + .await + .is_ok()); + assert!(tokio::time::timeout(Duration::from_millis(100), discover2) + .await + .is_err()); + } + #[tokio::test] async fn test_stop_all_discovery() { let dh_name = "debugEcho"; @@ -973,7 +1081,9 @@ pub mod tests { #[tokio::test] async fn test_start_discovery_termination() { let _ = env_logger::builder().is_test(true).try_init(); - let mut start_discovery_components = start_discovery_setup("config-a", true).await; + let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut start_discovery_components = + start_discovery_setup("config-a", true, discovery_handler_map).await; start_discovery_components .running_receiver .recv() @@ -1000,8 +1110,11 @@ pub mod tests { #[tokio::test] async fn test_start_discovery_same_discovery_handler() { let _ = env_logger::builder().is_test(true).try_init(); - let mut start_discovery_components_a = start_discovery_setup("config-a", false).await; - let mut start_discovery_components_b = start_discovery_setup("config-b", false).await; + let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut start_discovery_components_a = + start_discovery_setup("config-a", false, discovery_handler_map.clone()).await; + let mut start_discovery_components_b = + start_discovery_setup("config-b", false, discovery_handler_map.clone()).await; start_discovery_components_a .running_receiver @@ -1022,9 +1135,13 @@ pub mod tests { start_discovery_handle: tokio::task::JoinHandle<()>, } - async fn start_discovery_setup(config_name: &str, terminate: bool) -> StartDiscoveryComponents { - let (mut mock_discovery_operator, discovery_handler_map) = - setup_test_do_discover(config_name); + async fn start_discovery_setup( + config_name: &str, + terminate: bool, + discovery_handler_map: RegisteredDiscoveryHandlerMap, + ) -> StartDiscoveryComponents { + let mut mock_discovery_operator = + setup_test_do_discover(config_name, discovery_handler_map.clone()); let (running_sender, running_receiver) = tokio::sync::broadcast::channel::<()>(1); mock_discovery_operator .expect_get_stream() @@ -1032,6 +1149,7 @@ pub mod tests { running_sender.clone().send(()).unwrap(); None }); + mock_discovery_operator .expect_delete_offline_instances() .times(1) @@ -1047,11 +1165,15 @@ pub mod tests { .unwrap() .clone() .close_discovery_handler_connection; + let local_config_id = mock_discovery_operator.get_config_id(); mock_discovery_operator .expect_stop_all_discovery() .times(1) .returning(move || { - stop_dh_discovery_sender.clone().send(()).unwrap(); + stop_dh_discovery_sender + .clone() + .send(Some(local_config_id.clone())) + .unwrap(); }); } let (mut finished_discovery_sender, finished_discovery_receiver) = @@ -1083,7 +1205,8 @@ pub mod tests { #[tokio::test] async fn test_do_discover_completed_internal_connection() { let _ = env_logger::builder().is_test(true).try_init(); - let (mut mock_discovery_operator, _) = setup_test_do_discover("config-a"); + let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut mock_discovery_operator = setup_test_do_discover("config-a", discovery_handler_map); let mut get_stream_seq = Sequence::new(); // First time cannot get stream mock_discovery_operator diff --git a/agent/src/util/registration.rs b/agent/src/util/registration.rs index c0fdbae6a..4d5bd2204 100644 --- a/agent/src/util/registration.rs +++ b/agent/src/util/registration.rs @@ -1,3 +1,4 @@ +use super::config_action::ConfigId; use super::constants::CLOSE_DISCOVERY_HANDLER_CONNECTION_CHANNEL_CAPACITY; #[cfg(any(test, feature = "agent-full"))] use super::constants::ENABLE_DEBUG_ECHO_LABEL; @@ -51,7 +52,7 @@ pub struct DiscoveryDetails { pub shared: bool, /// Channel over which the Registration service tells a DiscoveryOperator client to close a connection with a /// `DiscoveryHandler`, if any. A broadcast channel is used so both the sending and receiving ends can be cloned. - pub close_discovery_handler_connection: broadcast::Sender<()>, + pub close_discovery_handler_connection: broadcast::Sender>, } /// This maps the endpoint string and endpoint type of a `RegisterDiscoveryHandlerRequest` into a @@ -121,12 +122,13 @@ impl Registration for AgentRegistration { if let Some(dh_details) = register_request_map.get(&dh_endpoint) { // Check if DH at that endpoint is already registered but changed request if dh_details.shared != req.shared || dh_details.endpoint != dh_endpoint { - // Stop current discovery with this DH if any. A receiver may not exist if + // Stop all (using None argument) current discovery with this DH if any. + // A receiver may not exist if: // 1) no configuration has been applied that uses this DH or // 2) a connection cannot be made with the DH's endpoint dh_details .close_discovery_handler_connection - .send(()) + .send(None) .unwrap_or_default(); } else { // Already registered. Return early.