Skip to content

Commit

Permalink
agent: Fix a stop discovery channel issue
Browse files Browse the repository at this point in the history
The stop discovery channel is shared by all Configuration resources of a
given discovery handler endpoint, this channel was used without giving
any context of the targeted DiscoveryOperator, leading to a stop of
discovery for all Configurations when a Configuration is modified or
deleted.

This commit makes the stop discovery channel send the concerned
Configuration ID (namespace + name) on the channel, allowing to filter
out requests targeting another DiscoveryOperator, the channel is still
shared so the cases where we really want to stop discovery for all
Configurations still works.

Signed-off-by: Nicolas Belouin <nicolas.belouin@suse.com>
  • Loading branch information
diconico07 committed Jul 21, 2023
1 parent db62ed8 commit 7d2360d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 27 deletions.
2 changes: 1 addition & 1 deletion agent/src/util/config_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
};
use tokio::sync::{broadcast, mpsc, RwLock};

type ConfigId = (String, String);
pub type ConfigId = (String, String);
type ConfigMap = Arc<RwLock<HashMap<ConfigId, ConfigInfo>>>;

/// Information for managing a Configuration, such as all applied Instances of that Configuration
Expand Down
169 changes: 146 additions & 23 deletions agent/src/util/discovery_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::super::INSTANCE_COUNT_METRIC;
#[cfg(any(test, feature = "agent-full"))]
use super::embedded_discovery_handlers::get_discovery_handler;
use super::{
config_action::ConfigId,
constants::SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS,
device_plugin_builder::{DevicePluginBuilder, DevicePluginBuilderInterface},
device_plugin_service,
Expand Down Expand Up @@ -73,6 +74,13 @@ impl DiscoveryOperator {
instance_map,
}
}
fn get_config_id(&self) -> ConfigId {
(
self.config.metadata.namespace.clone().unwrap(),
self.config.metadata.name.clone().unwrap(),
)
}

/// Returns discovery_handler_map field. Allows the struct to be mocked.
#[allow(dead_code)]
pub fn get_discovery_handler_map(&self) -> RegisteredDiscoveryHandlerMap {
Expand All @@ -95,7 +103,8 @@ impl DiscoveryOperator {
discovery_handler_map.get_mut(&self.config.spec.discovery_handler.name)
{
for (endpoint, dh_details) in discovery_handler_details_map.clone() {
match dh_details.close_discovery_handler_connection.send(()) {
// Send with the config_id so we stop discovery for this Configuration only.
match dh_details.close_discovery_handler_connection.send(Some(self.get_config_id())) {
Ok(_) => trace!("stop_all_discovery - discovery client for {} discovery handler at endpoint {:?} told to stop", self.config.spec.discovery_handler.name, endpoint),
Err(e) => error!("stop_all_discovery - discovery client {} discovery handler at endpoint {:?} could not receive stop message with error {:?}", self.config.spec.discovery_handler.name, endpoint, e)
}
Expand Down Expand Up @@ -206,13 +215,26 @@ impl DiscoveryOperator {
) -> anyhow::Result<()> {
// clone objects for thread
let discovery_operator = Arc::new(self.clone());
let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<()> =
let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<Option<ConfigId>> =
&mut dh_details.close_discovery_handler_connection.subscribe();
loop {
// Wait for either new discovery results or a message to stop discovery
tokio::select! {
_ = stop_discovery_receiver.recv() => {
trace!("internal_do_discover - received message to stop discovery for endpoint {:?} serving protocol {}", dh_details.endpoint, discovery_operator.get_config().spec.discovery_handler.name);
result = stop_discovery_receiver.recv() => {
// Stop is triggered if the current config_id (to only stop this task) or None (to stop all tasks of this handler) is sent.
if let Ok(Some(config_id)) = result {
if config_id != self.get_config_id() {
trace!("internal_do_discover - received message to stop discovery for another configuration, ignoring it.");
continue;
}
}
trace!(
"internal_do_discover({}::{}) - received message to stop discovery for endpoint {:?} serving protocol {}",
self.config.metadata.namespace.as_ref().unwrap(),
self.config.metadata.name.as_ref().unwrap(),
dh_details.endpoint,
discovery_operator.get_config().spec.discovery_handler.name,
);
break;
},
result = stream.get_message() => {
Expand Down Expand Up @@ -689,11 +711,22 @@ pub mod start_discovery {
sleep_duration = Duration::from_millis(100);
}

if tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv())
.await
.is_ok()
if let Ok(result) =
tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()).await
{
trace!("do_discover_on_discovery_handler - received message to stop discovery for {} Discovery Handler at endpoint {:?}", dh_details.name, dh_details.endpoint);
// Stop is triggered if the current config_id (to only stop this task) or None (to stop all tasks of this handler) is sent.
if let Ok(Some(config_id)) = result {
if config_id != discovery_operator.get_config_id() {
trace!("do_discover_on_discovery_handler - received message to stop discovery for another configuration, ignoring it.");
continue;
}
}
let (config_namespace, config_name) = discovery_operator.get_config_id();
trace!(
"do_discover_on_discovery_handler({}::{}) - received message to stop discovery for {} Discovery Handler at endpoint {:?}",
config_namespace, config_name,
dh_details.name, dh_details.endpoint,
);
break;
}
}
Expand Down Expand Up @@ -820,6 +853,10 @@ pub mod tests {
let ctx = MockDiscoveryOperator::new_context();
let discovery_handler_map_clone = discovery_handler_map.clone();
let config_clone = config.clone();
let config_id = (
config.metadata.namespace.clone().unwrap(),
config.metadata.namespace.clone().unwrap(),
);
let instance_map_clone = instance_map.clone();
ctx.expect().return_once(move |_, _, _| {
// let mut discovery_handler_status_seq = Sequence::new();
Expand All @@ -830,6 +867,8 @@ pub mod tests {
.returning(move || config_clone.clone());
mock.expect_get_instance_map()
.returning(move || instance_map_clone.clone());
mock.expect_get_config_id()
.returning(move || config_id.clone());
mock
});
MockDiscoveryOperator::new(discovery_handler_map, config, instance_map)
Expand All @@ -847,7 +886,9 @@ pub mod tests {
// Add discovery handler to registered discovery handler map
let dh_details_map = match registered_dh_map.lock().unwrap().clone().get_mut(dh_name) {
Some(dh_details_map) => {
dh_details_map.insert(endpoint.clone(), discovery_handler_details);
if !dh_details_map.contains_key(endpoint) {
dh_details_map.insert(endpoint.clone(), discovery_handler_details);
}
dh_details_map.clone()
}
None => {
Expand Down Expand Up @@ -878,8 +919,8 @@ pub mod tests {

fn setup_test_do_discover(
config_name: &str,
) -> (MockDiscoveryOperator, RegisteredDiscoveryHandlerMap) {
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
discovery_handler_map: RegisteredDiscoveryHandlerMap,
) -> MockDiscoveryOperator {
add_discovery_handler_to_map(
"debugEcho",
&DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()),
Expand All @@ -892,12 +933,12 @@ pub mod tests {
let config_yaml = std::fs::read_to_string(path_to_config).expect("Unable to read file");
let mut config: Configuration = serde_yaml::from_str(&config_yaml).unwrap();
config.metadata.name = Some(config_name.to_string());
let discovery_operator = create_mock_discovery_operator(
discovery_handler_map.clone(),
config.metadata.namespace = Some(config_name.to_string());
create_mock_discovery_operator(
discovery_handler_map,
config,
Arc::new(tokio::sync::RwLock::new(HashMap::new())),
);
(discovery_operator, discovery_handler_map)
)
}

#[test]
Expand All @@ -923,6 +964,73 @@ pub mod tests {
assert_eq!(first_shared_video_digest, second_shared_video_digest);
}

#[tokio::test]
async fn test_internal_do_discover_stop_one() {
let mock_kube_interface1: Arc<dyn k8s::KubeInterface> = Arc::new(MockKubeInterface::new());
let mock_kube_interface2 = mock_kube_interface1.clone();
let dh_name = "debugEcho";
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let endpoint = DiscoveryHandlerEndpoint::Uds("socket.sock".to_string());
add_discovery_handler_to_map(dh_name, &endpoint, false, discovery_handler_map.clone());
let dh_details1 = discovery_handler_map
.lock()
.unwrap()
.get(dh_name)
.unwrap()
.get(&endpoint)
.unwrap()
.clone();
let dh_details2 = dh_details1.clone();

let (_tx1, mut rx1) = mpsc::channel(2);
let (_tx2, mut rx2) = mpsc::channel(2);

let config1: Configuration = serde_yaml::from_str(
std::fs::read_to_string("../test/yaml/config-a.yaml")
.expect("Unable to read file")
.as_str(),
)
.unwrap();
let discovery_operator1 = Arc::new(DiscoveryOperator::new(
discovery_handler_map.clone(),
config1,
Arc::new(tokio::sync::RwLock::new(HashMap::new())),
));
let local_do1 = discovery_operator1.clone();
let discover1 = tokio::spawn(async move {
discovery_operator1
.internal_do_discover(mock_kube_interface1, &dh_details1, &mut rx1)
.await
.unwrap()
});

let config2: Configuration = serde_yaml::from_str(
std::fs::read_to_string("../test/yaml/config-b.yaml")
.expect("Unable to read file")
.as_str(),
)
.unwrap();
let discovery_operator2 = Arc::new(DiscoveryOperator::new(
discovery_handler_map,
config2,
Arc::new(tokio::sync::RwLock::new(HashMap::new())),
));
let discover2 = tokio::spawn(async move {
discovery_operator2
.internal_do_discover(mock_kube_interface2, &dh_details2, &mut rx2)
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(100)).await; // Make sure they had time to launch
local_do1.stop_all_discovery().await;
assert!(tokio::time::timeout(Duration::from_millis(100), discover1)
.await
.is_ok());
assert!(tokio::time::timeout(Duration::from_millis(100), discover2)
.await
.is_err());
}

#[tokio::test]
async fn test_stop_all_discovery() {
let dh_name = "debugEcho";
Expand Down Expand Up @@ -973,7 +1081,9 @@ pub mod tests {
#[tokio::test]
async fn test_start_discovery_termination() {
let _ = env_logger::builder().is_test(true).try_init();
let mut start_discovery_components = start_discovery_setup("config-a", true).await;
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut start_discovery_components =
start_discovery_setup("config-a", true, discovery_handler_map).await;
start_discovery_components
.running_receiver
.recv()
Expand All @@ -1000,8 +1110,11 @@ pub mod tests {
#[tokio::test]
async fn test_start_discovery_same_discovery_handler() {
let _ = env_logger::builder().is_test(true).try_init();
let mut start_discovery_components_a = start_discovery_setup("config-a", false).await;
let mut start_discovery_components_b = start_discovery_setup("config-b", false).await;
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut start_discovery_components_a =
start_discovery_setup("config-a", false, discovery_handler_map.clone()).await;
let mut start_discovery_components_b =
start_discovery_setup("config-b", false, discovery_handler_map.clone()).await;

start_discovery_components_a
.running_receiver
Expand All @@ -1022,16 +1135,21 @@ pub mod tests {
start_discovery_handle: tokio::task::JoinHandle<()>,
}

async fn start_discovery_setup(config_name: &str, terminate: bool) -> StartDiscoveryComponents {
let (mut mock_discovery_operator, discovery_handler_map) =
setup_test_do_discover(config_name);
async fn start_discovery_setup(
config_name: &str,
terminate: bool,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
) -> StartDiscoveryComponents {
let mut mock_discovery_operator =
setup_test_do_discover(config_name, discovery_handler_map.clone());
let (running_sender, running_receiver) = tokio::sync::broadcast::channel::<()>(1);
mock_discovery_operator
.expect_get_stream()
.returning(move |_| {
running_sender.clone().send(()).unwrap();
None
});

mock_discovery_operator
.expect_delete_offline_instances()
.times(1)
Expand All @@ -1047,11 +1165,15 @@ pub mod tests {
.unwrap()
.clone()
.close_discovery_handler_connection;
let local_config_id = mock_discovery_operator.get_config_id();
mock_discovery_operator
.expect_stop_all_discovery()
.times(1)
.returning(move || {
stop_dh_discovery_sender.clone().send(()).unwrap();
stop_dh_discovery_sender
.clone()
.send(Some(local_config_id.clone()))
.unwrap();
});
}
let (mut finished_discovery_sender, finished_discovery_receiver) =
Expand Down Expand Up @@ -1083,7 +1205,8 @@ pub mod tests {
#[tokio::test]
async fn test_do_discover_completed_internal_connection() {
let _ = env_logger::builder().is_test(true).try_init();
let (mut mock_discovery_operator, _) = setup_test_do_discover("config-a");
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut mock_discovery_operator = setup_test_do_discover("config-a", discovery_handler_map);
let mut get_stream_seq = Sequence::new();
// First time cannot get stream
mock_discovery_operator
Expand Down
8 changes: 5 additions & 3 deletions agent/src/util/registration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::config_action::ConfigId;
use super::constants::CLOSE_DISCOVERY_HANDLER_CONNECTION_CHANNEL_CAPACITY;
#[cfg(any(test, feature = "agent-full"))]
use super::constants::ENABLE_DEBUG_ECHO_LABEL;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub struct DiscoveryDetails {
pub shared: bool,
/// Channel over which the Registration service tells a DiscoveryOperator client to close a connection with a
/// `DiscoveryHandler`, if any. A broadcast channel is used so both the sending and receiving ends can be cloned.
pub close_discovery_handler_connection: broadcast::Sender<()>,
pub close_discovery_handler_connection: broadcast::Sender<Option<ConfigId>>,
}

/// This maps the endpoint string and endpoint type of a `RegisterDiscoveryHandlerRequest` into a
Expand Down Expand Up @@ -121,12 +122,13 @@ impl Registration for AgentRegistration {
if let Some(dh_details) = register_request_map.get(&dh_endpoint) {
// Check if DH at that endpoint is already registered but changed request
if dh_details.shared != req.shared || dh_details.endpoint != dh_endpoint {
// Stop current discovery with this DH if any. A receiver may not exist if
// Stop all (using None argument) current discovery with this DH if any.
// A receiver may not exist if:
// 1) no configuration has been applied that uses this DH or
// 2) a connection cannot be made with the DH's endpoint
dh_details
.close_discovery_handler_connection
.send(())
.send(None)
.unwrap_or_default();
} else {
// Already registered. Return early.
Expand Down

0 comments on commit 7d2360d

Please sign in to comment.