Skip to content

Commit

Permalink
Add Configuration device plugin (#627)
Browse files Browse the repository at this point in the history
* add configuration device plugin

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* update version

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* optimize dynamic vdev id selection, only report one additional free vdev

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* write usage information to annotation

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* remove uniqueDevices from Configuration CRD

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* add unit tests

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* cargo fmt

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* address PR comments

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* rename InstanceConfig to DevicePluginContext

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* update comments

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* address review comments

Signed-off-by: Johnson Shih <jshih@microsoft.com>

* update minor version

Signed-off-by: Johnson Shih <jshih@microsoft.com>

---------

Signed-off-by: Johnson Shih <jshih@microsoft.com>
  • Loading branch information
johnsonshih authored Aug 4, 2023
1 parent 2c6b1e2 commit 42a7615
Show file tree
Hide file tree
Showing 21 changed files with 2,714 additions and 1,225 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.11.6"
version = "0.12.0"
authors = ["Kate Goldenring <kate.goldenring@microsoft.com>", "<bfjelds@microsoft.com>"]
edition = "2018"
rust-version = "1.68.1"
Expand Down
61 changes: 37 additions & 24 deletions agent/src/util/config_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
DISCOVERY_OPERATOR_STOP_DISCOVERY_CHANNEL_CAPACITY,
},
device_plugin_service,
device_plugin_service::InstanceMap,
device_plugin_service::DevicePluginContext,
discovery_operator::start_discovery::{start_discovery, DiscoveryOperator},
registration::RegisteredDiscoveryHandlerMap,
};
Expand Down Expand Up @@ -32,7 +32,7 @@ type ConfigMap = Arc<RwLock<HashMap<ConfigId, ConfigInfo>>>;
#[derive(Debug)]
pub struct ConfigInfo {
/// Map of all of a Configuration's Instances
instance_map: InstanceMap,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
/// Sends notification to a `DiscoveryOperator` that it should stop all discovery for its Configuration.
/// This signals it to tell each of its subtasks to stop discovery.
/// A broadcast channel is used so both the sending and receiving ends can be cloned.
Expand Down Expand Up @@ -252,13 +252,13 @@ async fn handle_config_add(
config.metadata.name.clone().unwrap(),
);
// Create a new instance map for this config and add it to the config map
let instance_map: InstanceMap = Arc::new(RwLock::new(HashMap::new()));
let device_plugin_context = Arc::new(RwLock::new(DevicePluginContext::default()));
let (stop_discovery_sender, _): (broadcast::Sender<()>, broadcast::Receiver<()>) =
broadcast::channel(DISCOVERY_OPERATOR_STOP_DISCOVERY_CHANNEL_CAPACITY);
let (mut finished_discovery_sender, finished_discovery_receiver) =
mpsc::channel(DISCOVERY_OPERATOR_FINISHED_DISCOVERY_CHANNEL_CAPACITY);
let config_info = ConfigInfo {
instance_map: instance_map.clone(),
device_plugin_context: device_plugin_context.clone(),
stop_discovery_sender: stop_discovery_sender.clone(),
finished_discovery_receiver,
last_generation: config.metadata.generation,
Expand All @@ -269,7 +269,7 @@ async fn handle_config_add(
// Keep discovering instances until the config is deleted, signaled by a message from handle_config_delete
tokio::spawn(async move {
let discovery_operator =
DiscoveryOperator::new(discovery_handler_map, config, instance_map);
DiscoveryOperator::new(discovery_handler_map, config, device_plugin_context);
start_discovery(
discovery_operator,
new_discovery_handler_sender,
Expand Down Expand Up @@ -335,17 +335,29 @@ async fn handle_config_delete(
}

// Get map of instances for the Configuration and then remove Configuration from ConfigMap
let instance_map: InstanceMap;
let device_plugin_context;
{
let mut config_map_locked = config_map.write().await;
instance_map = config_map_locked
device_plugin_context = config_map_locked
.get(&config_id)
.unwrap()
.instance_map
.device_plugin_context
.clone();
config_map_locked.remove(&config_id);
}
delete_all_instances_in_map(kube_interface, instance_map, config_id).await?;
delete_all_instances_in_device_plugin_context(
kube_interface,
device_plugin_context.clone(),
config_id,
)
.await?;
if let Some(sender) = &device_plugin_context
.read()
.await
.usage_update_message_sender
{
sender.send(device_plugin_service::ListAndWatchMessageKind::End)?;
}
Ok(())
}

Expand Down Expand Up @@ -375,13 +387,13 @@ async fn should_recreate_config(
}

/// This shuts down all a Configuration's Instances and terminates the associated Device Plugins
pub async fn delete_all_instances_in_map(
pub async fn delete_all_instances_in_device_plugin_context(
kube_interface: &dyn k8s::KubeInterface,
instance_map: InstanceMap,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
(namespace, name): ConfigId,
) -> anyhow::Result<()> {
let mut instance_map_locked = instance_map.write().await;
let instances_to_delete_map = instance_map_locked.clone();
let mut device_plugin_context_locked = device_plugin_context.write().await;
let instances_to_delete_map = device_plugin_context_locked.clone().instances;
for (instance_name, instance_info) in instances_to_delete_map {
trace!(
"handle_config_delete - found Instance {} associated with deleted config {:?} ... sending message to end list_and_watch",
Expand All @@ -392,7 +404,9 @@ pub async fn delete_all_instances_in_map(
.list_and_watch_message_sender
.send(device_plugin_service::ListAndWatchMessageKind::End)
.unwrap();
instance_map_locked.remove(&instance_name);
device_plugin_context_locked
.instances
.remove(&instance_name);
try_delete_instance(kube_interface, &instance_name, namespace.as_str()).await?;
}
Ok(())
Expand All @@ -401,9 +415,8 @@ pub async fn delete_all_instances_in_map(
#[cfg(test)]
mod config_action_tests {
use super::super::{
device_plugin_service,
device_plugin_service::{InstanceConnectivityStatus, InstanceMap},
discovery_operator::tests::build_instance_map,
device_plugin_service, device_plugin_service::InstanceConnectivityStatus,
discovery_operator::tests::build_device_plugin_context,
};
use super::*;
use akri_shared::{akri::configuration::Configuration, k8s::MockKubeInterface};
Expand Down Expand Up @@ -468,7 +481,7 @@ mod config_action_tests {
let mut list_and_watch_message_receivers = Vec::new();
let mut visible_discovery_results = Vec::new();
let mut mock = MockKubeInterface::new();
let instance_map: InstanceMap = build_instance_map(
let device_plugin_context = build_device_plugin_context(
&config,
&mut visible_discovery_results,
&mut list_and_watch_message_receivers,
Expand All @@ -482,7 +495,7 @@ mod config_action_tests {
config_id.clone(),
ConfigInfo {
stop_discovery_sender,
instance_map: instance_map.clone(),
device_plugin_context: device_plugin_context.clone(),
finished_discovery_receiver,
last_generation: config.metadata.generation,
},
Expand Down Expand Up @@ -518,7 +531,7 @@ mod config_action_tests {
futures::future::join_all(tasks).await;

// Assert that all instances have been removed from the instance map
assert_eq!(instance_map.read().await.len(), 0);
assert_eq!(device_plugin_context.read().await.instances.len(), 0);
}

#[tokio::test]
Expand All @@ -534,7 +547,7 @@ mod config_action_tests {
let mut list_and_watch_message_receivers = Vec::new();
let mut visible_discovery_results = Vec::new();
let mut mock = MockKubeInterface::new();
let instance_map: InstanceMap = build_instance_map(
let device_plugin_context = build_device_plugin_context(
&config,
&mut visible_discovery_results,
&mut list_and_watch_message_receivers,
Expand All @@ -548,7 +561,7 @@ mod config_action_tests {
config_id.clone(),
ConfigInfo {
stop_discovery_sender,
instance_map: instance_map.clone(),
device_plugin_context: device_plugin_context.clone(),
finished_discovery_receiver,
last_generation: config.metadata.generation,
},
Expand Down Expand Up @@ -583,7 +596,7 @@ mod config_action_tests {
futures::future::join_all(tasks).await;

// Assert that all instances have been removed from the instance map
assert_eq!(instance_map.read().await.len(), 0);
assert_eq!(device_plugin_context.read().await.instances.len(), 0);
}

// Tests that when a Configuration is updated,
Expand Down Expand Up @@ -640,7 +653,7 @@ mod config_action_tests {
let (_, finished_discovery_receiver) = mpsc::channel(2);

let config_info = ConfigInfo {
instance_map: Arc::new(RwLock::new(HashMap::new())),
device_plugin_context: Arc::new(RwLock::new(DevicePluginContext::default())),
stop_discovery_sender: stop_discovery_sender.clone(),
finished_discovery_receiver,
last_generation: Some(1),
Expand Down
Loading

0 comments on commit 42a7615

Please sign in to comment.