diff --git a/CHANGELOG.md b/CHANGELOG.md index d77cd9ce8..49bc234a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Fixed +- Bugfix: when scheduling a pod, `GroupAntiAffinityStrategy` should not skip nodes that are mapped by other pods from different role+group. ([#222]) + +### Added +- `identity.rs` a new module split out of `scheduler.rs` that bundles code for pod and node id management. +- `identity::PodIdentityFactory` trait and one implementation called `identity::LabeledPodIdentityFactory` + +### Removed +- BREAKING: `scheduler::PodToNodeMapping::from` ([#222]) + +[#222]: https://github.com/stackabletech/operator-rs/pull/222 + ## [0.2.2] - 2021-09-21 diff --git a/src/error.rs b/src/error.rs index 068c13474..29281fe13 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ +use crate::name_utils; use crate::product_config_utils; -use crate::{name_utils, scheduler}; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -82,12 +82,6 @@ pub enum Error { )] RequiredFileMissing { search_path: Vec }, - #[error("Scheduler reported error: {source}")] - SchedulerError { - #[from] - source: scheduler::Error, - }, - #[error("ProductConfig Framework reported error: {source}")] ProductConfigError { #[from] @@ -102,6 +96,43 @@ pub enum Error { #[error("Error converting CRD byte array to UTF-8")] FromUtf8Error(#[from] std::string::FromUtf8Error), + #[error( + "Not enough nodes [{number_of_nodes}] available to schedule pods [{number_of_pods}]. Unscheduled pods: {unscheduled_pods:?}." + )] + NotEnoughNodesAvailable { + number_of_nodes: usize, + number_of_pods: usize, + unscheduled_pods: Vec, + }, + + #[error( + "PodIdentity could not be parsed: [{pod_id}]. This should not happen. Please open a ticket." + )] + PodIdentityNotParseable { pod_id: String }, + + #[error("Cannot build PodIdentity from Pod without labels. Missing labels: {0:?}")] + PodWithoutLabelsNotSupported(Vec), + + #[error("Cannot build NodeIdentity from node without name.")] + NodeWithoutNameNotSupported, + + #[error("Cannot construct PodIdentity from empty id field.")] + PodIdentityFieldEmpty, + + #[error( + "Pod identity field [{field}] with value [{value}] does not match the expected value [{expected}]" + )] + UnexpectedPodIdentityField { + field: String, + value: String, + expected: String, + }, + + #[error("Forbidden separator [{separator}] found in pod identity fields [{invalid_fields:?}]")] + PodIdentityFieldWithInvalidSeparator { + separator: String, + invalid_fields: BTreeMap, + }, } pub type OperatorResult = std::result::Result; diff --git a/src/identity.rs b/src/identity.rs new file mode 100644 index 000000000..c5cfcd823 --- /dev/null +++ b/src/identity.rs @@ -0,0 +1,814 @@ +///! This module implements structs and traits for pod and node identity management. They are the +///! building blocks of pod scheduling as implemented in the scheduler module. +///! +///! Operators are expected to implement the [`PodIdentityFactory`] trait or use the implementation +///! provided here called [`LabeledPodIdentityFactory`]. +///! +///! Useful structs and their meaning: +///! * [`PodIdentity`] : identifies a pod from the set of all pods managed by an operator. +///! * [`NodeIdentity`] : identifies a node from the set of eligible nodes available to the operator. +///! * [`PodToNodeMapping`] : Describes the node where pods are assigned. +/// +use crate::error::Error; +use crate::labels; +use crate::role_utils::{EligibleNodesAndReplicas, EligibleNodesForRoleAndGroup}; +use k8s_openapi::api::core::v1::{Node, Pod}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; +use std::hash::{Hash, Hasher}; + +const POD_IDENTITY_FIELD_SEPARATOR: &str = ";"; +pub const REQUIRED_LABELS: [&str; 4] = [ + labels::APP_NAME_LABEL, + labels::APP_INSTANCE_LABEL, + labels::APP_COMPONENT_LABEL, + labels::APP_ROLE_GROUP_LABEL, +]; + +#[derive( + Clone, Debug, Default, Deserialize, Eq, Hash, JsonSchema, Ord, PartialEq, PartialOrd, Serialize, +)] +#[serde(rename_all = "camelCase")] +#[serde(try_from = "String")] +#[serde(into = "String")] +pub struct PodIdentity { + app: String, + instance: String, + role: String, + group: String, + id: String, +} + +impl PodIdentity { + pub fn new( + app: &str, + instance: &str, + role: &str, + group: &str, + id: &str, + ) -> Result { + Self::assert_forbidden_char(app, instance, role, group, id)?; + Ok(PodIdentity { + app: app.to_string(), + instance: instance.to_string(), + role: role.to_string(), + group: group.to_string(), + id: id.to_string(), + }) + } + + pub fn try_from_pod_and_id(pod: &Pod, id_label: &str) -> Result { + if id_label.is_empty() { + return Err(Error::PodIdentityFieldEmpty); + } + + match &pod.metadata.labels { + Some(labels) => { + let mut missing_labels = Vec::with_capacity(4); + let mut app = String::new(); + let mut instance = String::new(); + let mut role = String::new(); + let mut group = String::new(); + let mut id = String::new(); + + match labels.get(labels::APP_NAME_LABEL).cloned() { + Some(value) => app = value, + _ => missing_labels.push(String::from(labels::APP_NAME_LABEL)), + } + match labels.get(labels::APP_INSTANCE_LABEL).cloned() { + Some(value) => instance = value, + _ => missing_labels.push(String::from(labels::APP_INSTANCE_LABEL)), + } + match labels.get(labels::APP_COMPONENT_LABEL).cloned() { + Some(value) => role = value, + _ => missing_labels.push(String::from(labels::APP_COMPONENT_LABEL)), + } + match labels.get(labels::APP_ROLE_GROUP_LABEL).cloned() { + Some(value) => group = value, + _ => missing_labels.push(String::from(labels::APP_ROLE_GROUP_LABEL)), + } + match labels.get(id_label).cloned() { + Some(value) => id = value, + _ => missing_labels.push(String::from(id_label)), + } + + if missing_labels.is_empty() { + Ok(PodIdentity::new( + app.as_str(), + instance.as_str(), + role.as_str(), + group.as_str(), + id.as_str(), + )?) + } else { + Err(Error::PodWithoutLabelsNotSupported(missing_labels)) + } + } + _ => Err(Error::PodWithoutLabelsNotSupported( + REQUIRED_LABELS.iter().map(|s| String::from(*s)).collect(), + )), + } + } + + pub fn app(&self) -> &str { + self.app.as_ref() + } + pub fn instance(&self) -> &str { + self.instance.as_ref() + } + pub fn role(&self) -> &str { + self.role.as_ref() + } + pub fn group(&self) -> &str { + self.group.as_ref() + } + pub fn id(&self) -> &str { + self.id.as_ref() + } + + pub fn compute_hash(&self, hasher: &mut DefaultHasher) -> u64 { + self.hash(hasher); + hasher.finish() + } + + fn assert_forbidden_char( + app: &str, + instance: &str, + role: &str, + group: &str, + id: &str, + ) -> Result<(), Error> { + let mut invalid_fields = BTreeMap::new(); + if app.contains(POD_IDENTITY_FIELD_SEPARATOR) { + invalid_fields.insert(String::from("app"), String::from(app)); + } + if instance.contains(POD_IDENTITY_FIELD_SEPARATOR) { + invalid_fields.insert(String::from("instance"), String::from(instance)); + } + if role.contains(POD_IDENTITY_FIELD_SEPARATOR) { + invalid_fields.insert(String::from("role"), String::from(role)); + } + if group.contains(POD_IDENTITY_FIELD_SEPARATOR) { + invalid_fields.insert(String::from("group"), String::from(group)); + } + if id.contains(POD_IDENTITY_FIELD_SEPARATOR) { + invalid_fields.insert(String::from("id"), String::from(id)); + } + + if invalid_fields.is_empty() { + Ok(()) + } else { + Err(Error::PodIdentityFieldWithInvalidSeparator { + separator: String::from(POD_IDENTITY_FIELD_SEPARATOR), + invalid_fields, + }) + } + } +} +impl TryFrom for PodIdentity { + type Error = Error; + fn try_from(s: String) -> Result { + let split = s.split(POD_IDENTITY_FIELD_SEPARATOR).collect::>(); + if split.len() != 5 { + return Err(Error::PodIdentityNotParseable { pod_id: s }); + } + PodIdentity::new(split[0], split[1], split[2], split[3], split[4]) + } +} + +impl From for String { + fn from(pod_id: PodIdentity) -> Self { + [ + pod_id.app, + pod_id.instance, + pod_id.role, + pod_id.group, + pod_id.id, + ] + .join(POD_IDENTITY_FIELD_SEPARATOR) + } +} + +const DEFAULT_NODE_NAME: &str = ""; + +#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeIdentity { + pub name: String, +} + +impl NodeIdentity { + pub fn new(name: &str) -> Self { + NodeIdentity { + name: String::from(name), + } + } +} + +impl TryFrom<&Pod> for NodeIdentity { + type Error = Error; + fn try_from(p: &Pod) -> Result { + let node = p + .spec + .as_ref() + .and_then(|s| s.node_name.clone()) + .ok_or(Error::NodeWithoutNameNotSupported)?; + + Ok(NodeIdentity::new(node.as_ref())) + } +} + +impl From for NodeIdentity { + fn from(node: Node) -> Self { + NodeIdentity { + name: node + .metadata + .name + .unwrap_or_else(|| DEFAULT_NODE_NAME.to_string()), + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PodToNodeMapping { + pub mapping: BTreeMap, +} + +impl PodToNodeMapping { + /// Return a mapping for `pods` given that the `id_factory` can convert them to pod identities. + /// Return an `Error` if any of the given pods cannot be converted to a pod identity or + /// is not mapped to a node. + /// # Argumens + /// - `id_factory` : A factory that can build a `PodIdentity` from a `Pod`. + /// - `pods` : The pods to add to the mapping. + pub fn try_from(id_factory: &dyn PodIdentityFactory, pods: &[Pod]) -> Result { + let mut mapping = BTreeMap::default(); + + for (pod_id, pod) in id_factory.try_map(pods)?.iter().zip(pods) { + mapping.insert(pod_id.clone(), NodeIdentity::try_from(pod)?); + } + + Ok(PodToNodeMapping { mapping }) + } + + pub fn get(&self, pod_id: &PodIdentity) -> Option<&NodeIdentity> { + self.mapping.get(pod_id) + } + + pub fn insert(&mut self, pod_id: PodIdentity, node_id: NodeIdentity) -> Option { + self.mapping.insert(pod_id, node_id) + } + + /// Returns a map where entries are filtered by the given arguments. + /// # Arguments + /// - `app` : Application name. + /// - `instance` : Application instance name. + /// - `role` : Role name. + /// - `group` : Group name. + pub fn filter( + &self, + app: &str, + instance: &str, + role: &str, + group: &str, + ) -> BTreeMap { + self.mapping + .iter() + .filter_map(|(pod_id, node_id)| { + if pod_id.app() == app + && pod_id.instance() == instance + && pod_id.role() == role + && pod_id.group() == group + { + Some((pod_id.clone(), node_id.clone())) + } else { + None + } + }) + .collect() + } + + pub fn merge(&self, other: &Self) -> Self { + PodToNodeMapping { + mapping: self + .mapping + .clone() + .into_iter() + .chain(other.mapping.clone().into_iter()) + .collect(), + } + } + + /// Return true if the `node` is already mapped by a pod from `role` and `group`. + pub fn mapped_by(&self, node: &NodeIdentity, role: &str, group: &str) -> bool { + for (pod_id, mapped_node) in self.mapping.iter() { + if node == mapped_node && pod_id.role() == role && pod_id.group() == group { + return true; + } + } + false + } + + /// Given `pods` return all that are not mapped. + pub fn missing(&self, pods: &[PodIdentity]) -> Vec { + let mut result = vec![]; + for p in pods { + if !self.mapping.contains_key(p) { + result.push(p.clone()) + } + } + result + } + + #[cfg(test)] + pub fn new(map: Vec<(PodIdentity, NodeIdentity)>) -> Self { + let mut result = BTreeMap::new(); + for (p, n) in map { + result.insert(p, n); + } + PodToNodeMapping { mapping: result } + } +} + +/// A pod identity generator that can be implemented by the operators. +/// +/// Implementation of this trait are responsible for: +/// - generating all pod identities expected by the service. +/// - map pods to their identities by implementing `try_map` +pub trait PodIdentityFactory: AsRef<[PodIdentity]> { + /// Returns a PodToNodeMapping for the given pods or an error if any pod could not be mapped. + fn try_map(&self, pods: &[Pod]) -> Result, Error>; +} + +/// An implementation of [`PodIdentityFactory`] where id's are incremented across all roles and groups +/// contained in `eligible_nodes`. +/// +/// This factory requires pods to be labeled with all `REQUIRED_LABELS` and with one additional "id label" +/// that can vary from operator to operator. +/// +/// See `generate_ids` for details. +pub struct LabeledPodIdentityFactory { + app: String, + instance: String, + id_label_name: String, + slice: Vec, +} + +impl LabeledPodIdentityFactory { + /// Build a new instance of this factory. + /// + /// See `Self::generate_ids` for implemtation details. + /// + /// # Arguments + /// - `app` : Application name. + /// - `instance` : Application name. + /// - `eligible_nodes` : Eligible nodes (and pod replicas) grouped by role and group. + /// - `id_label_name` : Name of the pod's id label used to store the `id` field of `PodIdentity` + /// - `start` : The initial value when generating the `id` fields of pod identities. + pub fn new( + app: &str, + instance: &str, + eligible_nodes: &EligibleNodesForRoleAndGroup, + id_label_name: &str, + start: usize, + ) -> Self { + LabeledPodIdentityFactory { + app: app.to_string(), + instance: instance.to_string(), + id_label_name: id_label_name.to_string(), + slice: Self::generate_ids(app, instance, eligible_nodes, start), + } + } + + /// Returns the given `pod_id` if it's fields match those managed by `Self` + /// This is a sanity check to make sure we don't mix pods that were not generated + /// using this factory. + fn fields_match(&self, pod_id: PodIdentity) -> Result { + if self.app != pod_id.app() { + return Err(Error::UnexpectedPodIdentityField { + field: "app".to_string(), + value: pod_id.app().to_string(), + expected: self.app.clone(), + }); + } + if self.instance != pod_id.instance() { + return Err(Error::UnexpectedPodIdentityField { + field: "instance".to_string(), + value: pod_id.instance().to_string(), + expected: self.instance.clone(), + }); + } + Ok(pod_id) + } + /// Returns a Vec of pod identities according to the replica per role+group pair from `eligible_nodes`. + /// + /// The `id` field is in the range from one (1) to the number of replicas per role+group. If no replicas + /// are defined, then the range goes from one (1) to the number of eligible groups. + /// + /// Given a `start` value of 1000, a role with two groups where the first group has two replicas and + /// the second has three replicas, the generated `id` fields of the pod identities are counted as follows: + /// + /// ```yaml + /// role_1: + /// - group_1: + /// - id: 1000 + /// - id: 1001 + /// - group_2: + /// - id: 1002 + /// - id: 1003 + /// - id: 1004 + /// ``` + /// + /// # Arguments + /// * `app_name` - Application name + /// * `instance` - Service instance + /// * `eligible_nodes` - Eligible nodes grouped by role and groups. + /// * `start` - The starting value for the id field. + fn generate_ids( + app_name: &str, + instance: &str, + eligible_nodes: &EligibleNodesForRoleAndGroup, + start: usize, + ) -> Vec { + let mut generated_ids = vec![]; + let mut id = start; + // sorting role and group to keep the output consistent and make this + // function testable. + let sorted_nodes: BTreeMap<&String, &HashMap> = + eligible_nodes.iter().collect(); + for (role_name, groups) in sorted_nodes { + let sorted_groups: BTreeMap<&String, &EligibleNodesAndReplicas> = groups + .iter() + .collect::>(); + for (group_name, eligible_nodes) in sorted_groups { + let ids_per_group = eligible_nodes + .replicas + .map(usize::from) + .unwrap_or_else(|| eligible_nodes.nodes.len()); + for _ in 0..ids_per_group { + generated_ids.push(PodIdentity { + app: app_name.to_string(), + instance: instance.to_string(), + role: role_name.clone(), + group: group_name.clone(), + id: id.to_string(), + }); + id += 1; + } + } + } + + generated_ids + } +} + +impl AsRef<[PodIdentity]> for LabeledPodIdentityFactory { + fn as_ref(&self) -> &[PodIdentity] { + self.slice.as_ref() + } +} + +impl PodIdentityFactory for LabeledPodIdentityFactory { + /// Returns a `PodToNodeMapping` for the given `pods`. + /// Returns an `error::Error` if any of the pods doesn't have the expected labels + /// or if any of the labels are invalid. A label is invalid if it doesn't match + /// the corresponding field in `Self` like `app` or `instance`. + /// # Arguments + /// - `pods` : A pod slice. + fn try_map(&self, pods: &[Pod]) -> Result, Error> { + let mut result = vec![]; + + for pod in pods { + let pod_id = PodIdentity::try_from_pod_and_id(pod, self.id_label_name.as_ref())?; + result.push(self.fields_match(pod_id)?); + } + Ok(result) + } +} + +#[cfg(test)] +pub mod tests { + + use super::*; + use crate::builder::{ObjectMetaBuilder, PodBuilder}; + use crate::role_utils::EligibleNodesAndReplicas; + use rstest::*; + use std::collections::{BTreeMap, HashMap}; + + pub const APP_NAME: &str = "app"; + pub const INSTANCE: &str = "simple"; + + #[rstest] + #[case(&[], "", Err(Error::PodIdentityFieldEmpty))] + #[case(&[], "ID_LABEL", + Err(Error::PodWithoutLabelsNotSupported([ + labels::APP_NAME_LABEL, + labels::APP_INSTANCE_LABEL, + labels::APP_COMPONENT_LABEL, + labels::APP_ROLE_GROUP_LABEL, + "ID_LABEL" + ].iter().map(|s| String::from(*s)).collect())))] + #[case::no_app_and_id_label(&[ + (labels::APP_INSTANCE_LABEL, "myinstance"), + (labels::APP_COMPONENT_LABEL, "myrole"), + (labels::APP_ROLE_GROUP_LABEL, "mygroup")], + "ID_LABEL", + Err(Error::PodWithoutLabelsNotSupported([ + labels::APP_NAME_LABEL, + "ID_LABEL" + ].iter().map(|s| String::from(*s)).collect())))] + #[case(&[(labels::APP_NAME_LABEL, "myapp"), + (labels::APP_INSTANCE_LABEL, "myinstance"), + (labels::APP_COMPONENT_LABEL, "myrole"), + (labels::APP_ROLE_GROUP_LABEL, "mygroup"), + ("ID_LABEL", "123")], + "ID_LABEL", + Ok(PodIdentity{ + app: "myapp".to_string(), + instance: "myinstance".to_string(), + role: "myrole".to_string(), + group: "mygroup".to_string(), + id: "123".to_string()}))] + fn test_identity_pod_identity_try_from_pod_and_id( + #[case] labels: &[(&str, &str)], + #[case] id: &str, + #[case] expected: Result, + ) { + let labels_map: BTreeMap = labels + .iter() + .map(|t| (t.0.to_string(), t.1.to_string())) + .collect(); + let pod = PodBuilder::new() + .metadata( + ObjectMetaBuilder::new() + .generate_name("pod1") + .namespace("default") + .with_labels(labels_map) + .build() + .unwrap(), + ) + .build() + .unwrap(); + + let got = PodIdentity::try_from_pod_and_id(&pod, id); + + // Cannot compare `SchedulerResult`s directly because `crate::error::Error` doesn't implement `PartialEq` + match (&got, &expected) { + (Ok(g), Ok(e)) => assert_eq!(g, e), + (Err(ge), Err(re)) => assert_eq!(format!("{:?}", ge), format!("{:?}", re)), + _ => panic!("got: {:?}\nexpected: {:?}", got, expected), + } + } + + #[rstest] + #[case(0, vec![], vec![])] + #[case::generate_one_id(0, vec![("role", "group", 0, 1)], vec![PodIdentity::new(APP_NAME, INSTANCE, "role", "group", "0").unwrap()])] + #[case::generate_one_id_starting_at_1000(1000, vec![("role", "group", 0, 1)], vec![PodIdentity::new(APP_NAME, INSTANCE, "role", "group", "1000").unwrap()])] + #[case::generate_five_ids(1, + vec![ + ("master", "default", 0, 2), + ("worker", "default", 0, 2), + ("history", "default", 0, 1), + ], + vec![ + PodIdentity::new(APP_NAME, INSTANCE, "history", "default", "1").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "master", "default", "2").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "master", "default", "3").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "worker", "default", "4").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "worker", "default", "5").unwrap(), + ] + )] + #[case::generate_two_roles(10, + vec![ + ("role1", "default", 0, 2), + ("role2", "default", 0, 1), + ], + vec![ + PodIdentity::new(APP_NAME, INSTANCE, "role1", "default", "10").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "role1", "default", "11").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "role2", "default", "12").unwrap(), + ] + )] + fn test_identity_labeled_factory_as_slice( + #[case] start: usize, + #[case] nodes_and_replicas: Vec<(&str, &str, usize, usize)>, + #[case] expected: Vec, + ) { + let eligible_nodes_and_replicas = build_eligible_nodes_and_replicas(nodes_and_replicas); + let factory = LabeledPodIdentityFactory::new( + APP_NAME, + INSTANCE, + &eligible_nodes_and_replicas, + "ID_LABEL", + start, + ); + let got = factory.as_ref(); + assert_eq!(got, expected.as_slice()); + } + + #[rstest] + #[case(0, vec![], vec![], Ok(vec![]))] + #[case(1000, + vec![("role", "group", 1, 1)], + vec![ + ("node_1", vec![ + (labels::APP_NAME_LABEL, APP_NAME), + (labels::APP_INSTANCE_LABEL, INSTANCE), + (labels::APP_COMPONENT_LABEL, "role"), + (labels::APP_ROLE_GROUP_LABEL, "group"), + ("ID_LABEL", "1000")]),], + Ok(vec![ + PodIdentity::new(APP_NAME, INSTANCE, "role", "group", "1000").unwrap(), + ]))] + #[case(1000, + vec![("master", "default", 1, 1)], + vec![ + ("node_1", vec![ + (labels::APP_NAME_LABEL, "this-pod-belongs-to-another-app"), + (labels::APP_INSTANCE_LABEL, INSTANCE), + (labels::APP_COMPONENT_LABEL, "master"), + (labels::APP_ROLE_GROUP_LABEL, "default"), + ("ID_LABEL", "1000")]),], + Err(Error::UnexpectedPodIdentityField { + field: APP_NAME.to_string(), + value: "this-pod-belongs-to-another-app".to_string(), + expected: APP_NAME.to_string() + }))] + fn test_identity_labeled_factory_try_from( + #[case] start: usize, + #[case] nodes_and_replicas: Vec<(&str, &str, usize, usize)>, + #[case] pod_labels: Vec<(&str, Vec<(&str, &str)>)>, + #[case] expected: Result, Error>, + ) { + let eligible_nodes_and_replicas = build_eligible_nodes_and_replicas(nodes_and_replicas); + let pods = build_pods(pod_labels); + let factory = LabeledPodIdentityFactory::new( + APP_NAME, + INSTANCE, + &eligible_nodes_and_replicas, + "ID_LABEL", + start, + ); + let got = factory.try_map(pods.as_slice()); + + // Cannot compare `SchedulerResult`s directly because `crate::error::Error` doesn't implement `PartialEq` + match (&got, &expected) { + (Ok(g), Ok(e)) => assert_eq!(g, e), + (Err(ge), Err(re)) => assert_eq!(format!("{:?}", ge), format!("{:?}", re)), + _ => panic!("got: {:?}\nexpected: {:?}", got, expected), + } + } + + #[rstest] + #[case(1, vec![], vec![], Ok(vec![]))] + #[case(1, + vec![("role", "group", 1, 1)], + vec![ + ("node_1", vec![ + (labels::APP_NAME_LABEL, APP_NAME), + (labels::APP_INSTANCE_LABEL, INSTANCE), + (labels::APP_COMPONENT_LABEL, "role"), + (labels::APP_ROLE_GROUP_LABEL, "group"), + ("ID_LABEL", "1000")]),], + Ok(vec![("node_1", PodIdentity::new(APP_NAME, INSTANCE, "role", "group", "1000").unwrap())]) + )] + fn test_identity_pod_mapping_try_from( + #[case] start: usize, + #[case] nodes_and_replicas: Vec<(&str, &str, usize, usize)>, + #[case] pod_labels: Vec<(&str, Vec<(&str, &str)>)>, + #[case] expected: Result, Error>, + ) { + let pods = build_pods(pod_labels); + let eligible_nodes_and_replicas = build_eligible_nodes_and_replicas(nodes_and_replicas); + let factory = LabeledPodIdentityFactory::new( + APP_NAME, + INSTANCE, + &eligible_nodes_and_replicas, + "ID_LABEL", + start, + ); + + let got = PodToNodeMapping::try_from(&factory, pods.as_slice()); + + // Cannot compare `SchedulerResult`s directly because `crate::error::Error` doesn't implement `PartialEq` + match (&got, &expected) { + (Ok(g), Ok(e)) => assert_eq!( + g, + &PodToNodeMapping { + mapping: e + .iter() + .map(|(node, id)| ( + id.clone(), + NodeIdentity { + name: node.to_string() + } + )) + .collect() + } + ), + (Err(ge), Err(re)) => assert_eq!(format!("{:?}", ge), format!("{:?}", re)), + _ => panic!("got: {:?}\nexpected: {:?}", got, expected), + } + } + + #[rstest] + #[case(vec![], vec![], vec![])] + #[case( + vec![], + vec![ ("node_1", APP_NAME, INSTANCE, "role1", "group1", "50")], + vec![PodIdentity::new(APP_NAME, INSTANCE, "role1", "group1", "50").unwrap()])] + #[case( + vec![PodIdentity::new(APP_NAME, INSTANCE, "role1", "group1", "51").unwrap()], + vec![ ("node_1", APP_NAME, INSTANCE, "role1", "group1", "50")], + vec![ + PodIdentity::new(APP_NAME, INSTANCE, "role1", "group1", "50").unwrap(), + PodIdentity::new(APP_NAME, INSTANCE, "role1", "group1", "51").unwrap(), + ])] + fn test_identity_pod_mapping_missing( + #[case] expected: Vec, + #[case] mapping_node_pod_id: Vec<(&str, &str, &str, &str, &str, &str)>, + #[case] pod_ids: Vec, + ) { + let mapping = build_mapping(mapping_node_pod_id); + let got = mapping.missing(pod_ids.as_slice()); + + assert_eq!(got, expected); + } + + pub fn build_pods(node_and_labels: Vec<(&str, Vec<(&str, &str)>)>) -> Vec { + let mut result = vec![]; + + for (node_name, pod_labels) in node_and_labels { + let labels_map: BTreeMap = pod_labels + .iter() + .map(|t| (t.0.to_string(), t.1.to_string())) + .collect(); + + result.push( + PodBuilder::new() + .metadata( + ObjectMetaBuilder::new() + .namespace("default") + .with_labels(labels_map) + .build() + .unwrap(), + ) + .node_name(node_name) + .build() + .unwrap(), + ); + } + result + } + + pub fn build_eligible_nodes_and_replicas( + nodes_and_replicas: Vec<(&str, &str, usize, usize)>, + ) -> EligibleNodesForRoleAndGroup { + let mut eligible_nodes: HashMap> = + HashMap::new(); + for (role, group, _node_count, replicas) in nodes_and_replicas { + eligible_nodes + .entry(String::from(role)) + .and_modify(|r| { + r.insert( + String::from(group), + EligibleNodesAndReplicas { + nodes: vec![], + replicas: Some(replicas as u16), + }, + ); + }) + .or_insert_with(|| { + vec![( + group.to_string(), + EligibleNodesAndReplicas { + nodes: vec![], + replicas: Some(replicas as u16), + }, + )] + .into_iter() + .collect() + }); + } + eligible_nodes + } + + pub fn build_mapping( + node_pod_id: Vec<(&str, &str, &str, &str, &str, &str)>, + ) -> PodToNodeMapping { + let mut mapping: BTreeMap = BTreeMap::default(); + for (node_name, app, instance, role, group, id) in node_pod_id { + mapping.insert( + PodIdentity::new(app, instance, role, group, id).unwrap(), + NodeIdentity { + name: String::from(node_name), + }, + ); + } + PodToNodeMapping { mapping } + } +} diff --git a/src/lib.rs b/src/lib.rs index fe211357b..0be6a4f59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub mod controller_utils; pub mod crd; pub mod error; pub mod finalizer; +pub mod identity; pub mod k8s_errors; pub mod k8s_utils; pub mod krustlet; diff --git a/src/scheduler.rs b/src/scheduler.rs index e40e37619..915f5f95b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -20,109 +20,26 @@ //! that pod id's are "stable" and have a semantic known to the calling operator. //! //! +use std::cell::RefCell; +use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; +use std::ops::DerefMut; -use crate::client::Client; -use crate::error::OperatorResult; -use crate::labels; -use crate::role_utils::EligibleNodesForRoleAndGroup; -use k8s_openapi::api::core::v1::{Node, Pod}; use kube::api::Resource; -use schemars::JsonSchema; use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; use serde_json::json; -use std::cell::RefCell; -use std::collections::btree_map::Iter; -use std::collections::hash_map::DefaultHasher; -use std::convert::TryFrom; -use std::hash::{Hash, Hasher}; -use std::ops::DerefMut; -use tracing::{error, warn}; - -const DEFAULT_NODE_NAME: &str = ""; -const SEMICOLON: &str = ";"; - -#[derive(Debug, thiserror::Error, PartialEq)] -pub enum Error { - #[error( - "Not enough nodes [{number_of_nodes}] available to schedule pods [{number_of_pods}]. Unscheduled pods: {unscheduled_pods:?}." - )] - NotEnoughNodesAvailable { - number_of_nodes: usize, - number_of_pods: usize, - unscheduled_pods: Vec, - }, - - #[error("PodIdentity could not be parsed: {pod_id:?}. This should not happen. Please open a ticket.")] - PodIdentityNotParseable { pod_id: String }, -} - -/// Returns a Vec of pod identities according to the replica per role+group pair from `eligible_nodes`. -/// # Arguments -/// * `app_name` - Application name -/// * `instance` - Service instance -/// * `eligible_nodes` - Eligible nodes grouped by role and groups. -pub fn generate_ids( - app_name: &str, - instance: &str, - eligible_nodes: &EligibleNodesForRoleAndGroup, -) -> Vec { - let mut id: u16 = 1; - let mut generated_ids = vec![]; - for (role_name, groups) in eligible_nodes { - for (group_name, eligible_nodes) in groups { - let ids_per_group = eligible_nodes - .replicas - .map(usize::from) - .unwrap_or_else(|| eligible_nodes.nodes.len()); - for _ in 1..ids_per_group + 1 { - generated_ids.push(PodIdentity { - app: app_name.to_string(), - instance: instance.to_string(), - role: role_name.clone(), - group: group_name.clone(), - id: id.to_string(), - }); - id += 1; - } - } - } - - generated_ids -} -#[derive( - Clone, Debug, Default, Deserialize, Eq, Hash, JsonSchema, Ord, PartialEq, PartialOrd, Serialize, -)] -#[serde(rename_all = "camelCase")] -#[serde(try_from = "String")] -#[serde(into = "String")] -pub struct PodIdentity { - app: String, - instance: String, - role: String, - group: String, - id: String, -} - -#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct NodeIdentity { - pub name: String, -} - -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct PodToNodeMapping { - mapping: BTreeMap, -} +use crate::client::Client; +use crate::error::{Error, OperatorResult}; +use crate::identity::{NodeIdentity, PodIdentity, PodIdentityFactory, PodToNodeMapping}; +use crate::role_utils::EligibleNodesForRoleAndGroup; +use k8s_openapi::api::core::v1::Pod; pub trait PodPlacementHistory { - fn find(&self, pod_id: &PodIdentity) -> Option<&NodeIdentity>; - fn find_all(&self, pods: &[&PodIdentity]) -> Vec> { - pods.iter().map(|p| self.find(*p)).collect() + fn find(&self, pod_id: &PodIdentity) -> Option; + fn find_all(&self, pods: &[PodIdentity]) -> Vec> { + pods.iter().map(|p| self.find(p)).collect() } fn update(&mut self, pod_id: &PodIdentity, node_id: &NodeIdentity); @@ -158,14 +75,14 @@ pub trait Scheduler { /// Implementations may return an error if not all pods can be mapped to nodes. /// /// # Arguments: - /// * `pods` - The list of desired pods. Should contain both already mapped as well as new pods. + /// * `pod_id_factory` - A factory object for all pod identities required by the service. /// * `nodes` - Currently available nodes in the system grouped by role and group. - /// * `mapped_pods` - Pods that are already mapped to nodes. + /// * `pods` - Pods that are already mapped to nodes. fn schedule( &mut self, - pods: &[PodIdentity], + pod_id_factory: &dyn PodIdentityFactory, nodes: &RoleGroupEligibleNodes, - mapped_pods: &PodToNodeMapping, + pods: &[Pod], ) -> SchedulerResult; } @@ -181,8 +98,8 @@ pub trait PodPlacementStrategy { /// * `preferred_nodes` - Optional nodes to prioritize during placement (if not None) fn place( &self, - pods: &[&PodIdentity], - preferred_nodes: &[Option<&NodeIdentity>], + pods: &[PodIdentity], + preferred_nodes: &[Option], ) -> Vec>; } @@ -275,142 +192,9 @@ impl SchedulerState { } } -impl TryFrom for PodIdentity { - type Error = Error; - fn try_from(s: String) -> Result { - let split = s.split(SEMICOLON).collect::>(); - if split.len() != 5 { - return Err(Error::PodIdentityNotParseable { pod_id: s }); - } - Ok(PodIdentity::new( - split[0], split[1], split[2], split[3], split[4], - )) - } -} - -impl From for String { - fn from(pod_id: PodIdentity) -> Self { - [ - pod_id.app, - pod_id.instance, - pod_id.role, - pod_id.group, - pod_id.id, - ] - .join(SEMICOLON) - } -} - -impl PodToNodeMapping { - pub fn from(pods: &[Pod], id_label_name: Option<&str>) -> Self { - let mut pod_node_mapping = PodToNodeMapping::default(); - - for pod in pods { - if let Some(labels) = &pod.metadata.labels { - let app = labels.get(labels::APP_NAME_LABEL); - let instance = labels.get(labels::APP_INSTANCE_LABEL); - let role = labels.get(labels::APP_COMPONENT_LABEL); - let group = labels.get(labels::APP_ROLE_GROUP_LABEL); - let id = id_label_name.and_then(|n| labels.get(n)); - pod_node_mapping.insert( - PodIdentity { - app: app.cloned().unwrap_or_default(), - instance: instance.cloned().unwrap_or_default(), - role: role.cloned().unwrap_or_default(), - group: group.cloned().unwrap_or_default(), - id: id.cloned().unwrap_or_default(), - }, - NodeIdentity { - name: pod.spec.as_ref().map(|s| s.node_name.as_ref()).map_or_else( - || DEFAULT_NODE_NAME.to_string(), - |name| name.unwrap().clone(), - ), - }, - ); - } - } - pod_node_mapping - } - - pub fn iter(&self) -> Iter<'_, PodIdentity, NodeIdentity> { - self.mapping.iter() - } - - pub fn get_filtered(&self, role: &str, group: &str) -> BTreeMap { - let mut filtered = BTreeMap::new(); - for (pod_id, node_id) in &self.mapping { - if pod_id.role == *role && pod_id.group == *group { - filtered.insert(pod_id.clone(), node_id.clone()); - } - } - filtered - } - - pub fn get(&self, pod_id: &PodIdentity) -> Option<&NodeIdentity> { - self.mapping.get(pod_id) - } - - pub fn insert(&mut self, pod_id: PodIdentity, node_id: NodeIdentity) -> Option { - self.mapping.insert(pod_id, node_id) - } - - pub fn filter(&self, id: &PodIdentity) -> Vec { - self.mapping - .iter() - .filter_map(|(pod_id, node_id)| { - if pod_id.app == id.app - && pod_id.instance == id.instance - && pod_id.role == id.role - && pod_id.group == id.group - { - Some(node_id.clone()) - } else { - None - } - }) - .collect() - } - - pub fn merge(&self, other: &Self) -> Self { - let mut temp = self.mapping.clone(); - temp.extend(other.clone().mapping); - PodToNodeMapping { mapping: temp } - } - - /// Find the pod that is currently mapped onto `node`. - pub fn mapped_by(&self, node: &NodeIdentity) -> Option<&PodIdentity> { - for (pod_id, mapped_node) in self.mapping.iter() { - if node == mapped_node { - return Some(pod_id); - } - } - None - } - - /// Given `pods` return all that are not mapped. - pub fn missing<'a>(&self, pods: &'a [PodIdentity]) -> Vec<&'a PodIdentity> { - let mut result = vec![]; - for p in pods { - if !self.mapping.contains_key(p) { - result.push(p) - } - } - result - } - - #[cfg(test)] - pub fn new(map: Vec<(PodIdentity, NodeIdentity)>) -> Self { - let mut result = BTreeMap::new(); - for (p, n) in map { - result.insert(p.clone(), n.clone()); - } - PodToNodeMapping { mapping: result } - } -} - impl PodPlacementHistory for K8SUnboundedHistory<'_> { - fn find(&self, pod_id: &PodIdentity) -> Option<&NodeIdentity> { - self.history.get(pod_id) + fn find(&self, pod_id: &PodIdentity) -> Option { + self.history.get(pod_id).cloned() } /// @@ -419,7 +203,7 @@ impl PodPlacementHistory for K8SUnboundedHistory<'_> { fn update(&mut self, pod_id: &PodIdentity, node_id: &NodeIdentity) { if let Some(history_node_id) = self.find(pod_id) { // found but different - if history_node_id != node_id { + if history_node_id != *node_id { self.history.insert(pod_id.clone(), node_id.clone()); self.modified = true; } @@ -437,17 +221,6 @@ impl Display for NodeIdentity { } } -impl From for NodeIdentity { - fn from(node: Node) -> Self { - NodeIdentity { - name: node - .metadata - .name - .unwrap_or_else(|| DEFAULT_NODE_NAME.to_string()), - } - } -} - /// Implements scheduler with memory. Once a Pod with a given identifier is scheduled on a node, /// it will always be rescheduled to this node as long as it exists. impl<'a, H> StickyScheduler<'a, H> @@ -474,27 +247,27 @@ where /// /// The nodes where unscheduled pods are mapped are selected by the configured strategy. /// # Arguments: - /// * `pods` - all pod ids required by the service. + /// * `pod_id_factory` - a provider for all pod ides required by the system. /// * `nodes` - all eligible nodes available in the system - /// * `mapped_pods` - existing pod to node mapping + /// * `pods` - existing pods that are mapped to nodes. fn schedule( &mut self, - pods: &[PodIdentity], + pod_id_factory: &dyn PodIdentityFactory, nodes: &RoleGroupEligibleNodes, - mapped_pods: &PodToNodeMapping, + pods: &[Pod], ) -> SchedulerResult { - let unscheduled_pods = mapped_pods.missing(pods); + let mapping = PodToNodeMapping::try_from(pod_id_factory, pods)?; + let unscheduled_pods = mapping.missing(pod_id_factory.as_ref()); let history_nodes = self.history.find_all(unscheduled_pods.as_slice()); - - let strategy = self.strategy(nodes, mapped_pods); + let strategy = self.strategy(nodes, &mapping); let selected_nodes = strategy.place(unscheduled_pods.as_slice(), history_nodes.as_slice()); self.update_history_and_result( - unscheduled_pods, - selected_nodes.to_vec(), + unscheduled_pods.as_slice(), + selected_nodes.as_slice(), pods.len(), nodes.count_unique_node_ids(), - mapped_pods, + &mapping, ) } } @@ -527,8 +300,8 @@ where /// * `current_mapping` - existing pod to node mapping fn update_history_and_result( &mut self, - pods: Vec<&PodIdentity>, - nodes: Vec>, + pods: &[PodIdentity], + nodes: &[Option], number_of_pods: usize, number_of_nodes: usize, current_mapping: &PodToNodeMapping, @@ -537,15 +310,15 @@ where let mut result_err = vec![]; let mut result_ok = BTreeMap::new(); - for (pod, opt_node) in pods.iter().zip(&nodes) { + for (pod, opt_node) in pods.iter().zip(nodes) { match opt_node { Some(node) => { // Found a node to schedule on so update the result - result_ok.insert((*pod).clone(), node.clone()); + result_ok.insert(pod.clone(), node.clone()); // and update the history if needed. self.history.update(pod, node); } - None => result_err.push((*pod).clone()), // No node available for this pod + None => result_err.push(format!("{:?}", pod.clone())), // No node available for this pod } } @@ -596,7 +369,7 @@ impl RoleGroupEligibleNodes { fn preferred_node_or( &self, pod: &PodIdentity, - preferred: Option<&NodeIdentity>, + preferred: Option, default: F, ) -> Option where @@ -604,12 +377,12 @@ impl RoleGroupEligibleNodes { { match self .node_set - .get(&pod.role) - .and_then(|role| role.get(&pod.group)) + .get(&pod.role().to_string()) + .and_then(|role| role.get(&pod.group().to_string())) { Some(nodes) if !nodes.is_empty() => { if let Some(node_id) = preferred { - let tmp = nodes.iter().find(|n| *n == node_id); + let tmp = nodes.iter().find(|n| n == &&node_id); if tmp.is_some() { return tmp.cloned(); } @@ -624,7 +397,7 @@ impl RoleGroupEligibleNodes { fn preferred_node_or_last( &self, pod: &PodIdentity, - preferred: Option<&NodeIdentity>, + preferred: Option, ) -> Option { self.preferred_node_or(pod, preferred, |_pod, nodes| nodes.last().cloned()) } @@ -638,7 +411,7 @@ impl RoleGroupEligibleNodes { } /// - /// Count the total number of unique node identities in the `matching_nodes` + /// Count the total number of unique node identities. /// pub fn count_unique_node_ids(&self) -> usize { self.node_set @@ -648,11 +421,6 @@ impl RoleGroupEligibleNodes { .collect::>() .len() } - - #[cfg(test)] - fn get_nodes_mut(&mut self, role: &str, group: &str) -> Option<&mut Vec> { - self.node_set.get_mut(role).and_then(|g| g.get_mut(group)) - } } impl<'a> GroupAntiAffinityStrategy<'a> { @@ -666,19 +434,21 @@ impl<'a> GroupAntiAffinityStrategy<'a> { pub fn select_node_for_pod( &self, pod_id: &PodIdentity, - preferred_node: Option<&NodeIdentity>, + preferred_node: Option, ) -> Option { let mut borrowed_nodes = self.eligible_nodes.borrow_mut(); // Find a node to schedule on (it might be the node from history) - while let Some(next_node) = borrowed_nodes.preferred_node_or_last(pod_id, preferred_node) { - borrowed_nodes.remove_eligible_node( - &next_node, - pod_id.role.as_str(), - pod_id.group.as_str(), - ); - // check that the node is not already in use - if self.existing_mapping.mapped_by(&next_node).is_none() { + while let Some(next_node) = + borrowed_nodes.preferred_node_or_last(pod_id, preferred_node.clone()) + { + borrowed_nodes.remove_eligible_node(&next_node, pod_id.role(), pod_id.group()); + + // check that the node is not already in use *by a pod from the same role+group* + if !self + .existing_mapping + .mapped_by(&next_node, pod_id.role(), pod_id.group()) + { return Some(next_node); } } @@ -692,83 +462,18 @@ impl PodPlacementStrategy for GroupAntiAffinityStrategy<'_> { /// might not reflect the reality between calls. fn place( &self, - pods: &[&PodIdentity], - preferred_nodes: &[Option<&NodeIdentity>], + pod_identities: &[PodIdentity], + preferred_nodes: &[Option], ) -> Vec> { - assert_eq!(pods.len(), preferred_nodes.len()); - pods.iter() + assert_eq!(pod_identities.len(), preferred_nodes.len()); + pod_identities + .iter() .zip(preferred_nodes.iter()) - .map(|(pod, preferred_node)| self.select_node_for_pod(*pod, *preferred_node)) + .map(|(pod, preferred_node)| self.select_node_for_pod(pod, preferred_node.clone())) .collect() } } -impl PodIdentity { - pub fn new(app: &str, instance: &str, role: &str, group: &str, id: &str) -> Self { - Self::warn_forbidden_char(app, instance, role, group, id); - PodIdentity { - app: app.to_string(), - instance: instance.to_string(), - role: role.to_string(), - group: group.to_string(), - id: id.to_string(), - } - } - - pub fn app(&self) -> &str { - self.app.as_ref() - } - pub fn instance(&self) -> &str { - self.instance.as_ref() - } - pub fn role(&self) -> &str { - self.role.as_ref() - } - pub fn group(&self) -> &str { - self.group.as_ref() - } - pub fn id(&self) -> &str { - self.id.as_ref() - } - - pub fn compute_hash(&self, hasher: &mut DefaultHasher) -> u64 { - self.hash(hasher); - hasher.finish() - } - - fn warn_forbidden_char(app: &str, instance: &str, role: &str, group: &str, id: &str) { - if app.contains(SEMICOLON) { - warn!( - "Found forbidden character [{}] in application name: {}", - SEMICOLON, app - ); - } - if instance.contains(SEMICOLON) { - warn!( - "Found forbidden character [{}] in instance name: {}", - SEMICOLON, instance - ); - } - if role.contains(SEMICOLON) { - warn!( - "Found forbidden character [{}] in role name: {}", - SEMICOLON, role - ); - } - if group.contains(SEMICOLON) { - warn!( - "Found forbidden character [{}] in group name: {}", - SEMICOLON, group - ); - } - if id.contains(SEMICOLON) { - warn!( - "Found forbidden character [{}] in pod id: {}", - SEMICOLON, id - ); - } - } -} impl<'a> HashingStrategy<'a> { pub fn new(eligible_nodes: &'a RoleGroupEligibleNodes) -> Self { Self { @@ -780,7 +485,7 @@ impl<'a> HashingStrategy<'a> { fn select_node_for_pod( &self, pod: &PodIdentity, - preferred_node: Option<&NodeIdentity>, + preferred_node: Option, ) -> Option { self.eligible_nodes .preferred_node_or(pod, preferred_node, |pod, nodes| { @@ -794,45 +499,32 @@ impl<'a> HashingStrategy<'a> { impl PodPlacementStrategy for HashingStrategy<'_> { fn place( &self, - pods: &[&PodIdentity], - preferred_nodes: &[Option<&NodeIdentity>], + pods: &[PodIdentity], + preferred_nodes: &[Option], ) -> Vec> { assert_eq!(pods.len(), preferred_nodes.len()); pods.iter() .zip(preferred_nodes.iter()) - .map(|(pod, preferred_node)| self.select_node_for_pod(*pod, *preferred_node)) + .map(|(pod, preferred_node)| self.select_node_for_pod(pod, preferred_node.clone())) .collect() } } #[cfg(test)] mod tests { - use super::*; use rstest::*; - const APP_NAME: &str = "app"; - const INSTANCE: &str = "simple"; + use crate::identity; + //use crate::identity::tests; - #[derive(Default)] - struct TestHistory { - pub history: PodToNodeMapping, - } + use super::*; - fn generate_ids(how_many: usize) -> Vec { - (0..how_many) - .map(|index| PodIdentity { - app: APP_NAME.to_string(), - instance: INSTANCE.to_string(), - role: format!("ROLE_{}", index % 2), - group: format!("GROUP_{}", index % 2), - id: format!("POD_{}", index), - }) - .collect() - } + #[derive(Default)] + struct TestHistory {} impl PodPlacementHistory for TestHistory { - fn find(&self, pod_id: &PodIdentity) -> Option<&NodeIdentity> { - self.history.get(pod_id) + fn find(&self, _pod_id: &PodIdentity) -> Option { + None } fn update(&mut self, _pod_id: &PodIdentity, _node_id: &NodeIdentity) { @@ -840,235 +532,160 @@ mod tests { } } - fn generate_eligible_nodes(available_node_count: usize) -> RoleGroupEligibleNodes { - let mut node_set: BTreeMap>> = BTreeMap::new(); - for index in 0..available_node_count { - let role_name = format!("ROLE_{}", index % 2); - let group_name = format!("GROUP_{}", index % 2); - let node = NodeIdentity { - name: format!("NODE_{}", index), - }; - if let Some(role) = node_set.get_mut(&role_name) { - if let Some(group) = role.get_mut(&group_name) { - group.push(node); - } else { - role.insert(group_name, vec![node]); - } - } else { - let mut new_group = BTreeMap::new(); - new_group.insert(group_name, vec![node]); - node_set.insert(role_name, new_group); - } - } - RoleGroupEligibleNodes { node_set } - } - - fn generate_current_mapping( - scheduled_pods: &[PodIdentity], - mut available_nodes: RoleGroupEligibleNodes, - ) -> PodToNodeMapping { - let mut current_mapping = BTreeMap::new(); - - for pod_id in scheduled_pods { - let nodes = available_nodes - .get_nodes_mut(&pod_id.role, &pod_id.group) - .unwrap(); - current_mapping.insert(pod_id.clone(), nodes.pop().unwrap().clone()); - } - - PodToNodeMapping { - mapping: current_mapping, - } - } - #[rstest] - #[case::nothing_to_place(1, 1, 1, &[], &[])] - #[case::not_enough_nodes(1, 0, 0, &[None], &[None])] - #[case::place_one_pod(1, 0, 1, &[None], &[Some(NodeIdentity { name: "NODE_0".to_string() })])] - #[case::place_one_pod_on_preferred(1, 0, 5, &[Some(NodeIdentity { name: "NODE_2".to_string() })], &[Some(NodeIdentity { name: "NODE_2".to_string() })])] - #[case::place_three_pods(3, 0, 5, &[None, None, None], - &[Some(NodeIdentity { name: "NODE_4".to_string() }), - Some(NodeIdentity { name: "NODE_3".to_string() }), - Some(NodeIdentity { name: "NODE_2".to_string() })])] - #[case::place_three_pods_one_on_preferred(3, 0, 5, &[None, Some(NodeIdentity { name: "NODE_1".to_string() }), None], - &[Some(NodeIdentity { name: "NODE_4".to_string() }), - Some(NodeIdentity { name: "NODE_1".to_string() }), - Some(NodeIdentity { name: "NODE_2".to_string() })])] + #[case(vec![], vec![], vec![], vec![], vec![])] + #[case::place_one_pod_id( + vec![("master", "default", vec!["node_1"])], + vec![], + vec![PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap()], + vec![None], + vec![Some(NodeIdentity::new("node_1"))])] + #[case::place_two_pods_on_the_same_node( + vec![("master", "default", vec!["node_1"]), ("worker", "default", vec!["node_1"])], + vec![], + vec![ + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "worker", "default", "11").unwrap()], + vec![None, None], + vec![Some(NodeIdentity::new("node_1")), Some(NodeIdentity::new("node_1"))])] + #[case::place_five_pods_on_three_nodes( + vec![ + ("master", "default", vec!["node_1", "node_2", "node_3"]), + ("worker", "default", vec!["node_1", "node_2", "node_3"]), + ("history", "default", vec!["node_1", "node_2", "node_3"]),], + vec![], + vec![ + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "11").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "worker", "default", "12").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "worker", "default", "13").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "worker", "default", "14").unwrap(),], + vec![None, None, None, None, None], + vec![ + Some(NodeIdentity::new("node_3")), + Some(NodeIdentity::new("node_2")), + Some(NodeIdentity::new("node_3")), + Some(NodeIdentity::new("node_2")), + Some(NodeIdentity::new("node_1")), + ])] + #[case::no_node_to_place_pod( + vec![], + vec![], + vec![PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap()], + vec![None], + vec![None])] + #[case::not_enough_nodes_for_two_pods( + vec![ + ("master", "default", vec!["node_1"]), + ], + vec![], + vec![ + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap(), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "11").unwrap(),], + vec![None, None], + vec![Some(NodeIdentity::new("node_1")), None])] + #[case::current_mapping_occupies_node( + vec![ + ("master", "default", vec!["node_1"]), + ], + vec![("node_1", identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10")], + vec![ + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "11").unwrap(),], + vec![None], + vec![None])] + #[case::place_pod_on_preferred_node( + vec![("master", "default", vec!["node_1", "node_2", "node_3", "node_4"]),], + vec![], + vec![PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "master", "default", "10").unwrap(),], + vec![Some(NodeIdentity::new("node_2"))], + vec![Some(NodeIdentity::new("node_2"))])] fn test_scheduler_group_anti_affinity( - #[case] wanted_pod_count: usize, - #[case] scheduled_pods_count: usize, - #[case] available_node_count: usize, - #[case] preferred_nodes: &[Option], - #[case] expected: &[Option], + #[case] role_group_nodes: Vec<(&str, &str, Vec<&str>)>, + #[case] current_mapping_node_pod_id: Vec<(&str, &str, &str, &str, &str, &str)>, + #[case] pod_identities: Vec, + #[case] preferred_nodes: Vec>, + #[case] expected: Vec>, ) { - let wanted_pods = generate_ids(wanted_pod_count); - let eligible_nodes = generate_eligible_nodes(available_node_count); + let current_mapping = identity::tests::build_mapping(current_mapping_node_pod_id); + let nodes = build_role_group_nodes(role_group_nodes); - let scheduled_pods: Vec<_> = wanted_pods - .iter() - .take(scheduled_pods_count) - .cloned() - .collect(); - let current_mapping = generate_current_mapping(&scheduled_pods, eligible_nodes.clone()); + let strategy = GroupAntiAffinityStrategy::new(nodes, ¤t_mapping); - let vec_preferred_nodes: Vec> = - preferred_nodes.iter().map(|o| o.as_ref()).collect(); - let strategy = GroupAntiAffinityStrategy::new(eligible_nodes, ¤t_mapping); - let got = strategy.place( - current_mapping.missing(wanted_pods.as_slice()).as_slice(), - vec_preferred_nodes.as_slice(), - ); - assert_eq!(got, expected.to_vec()); - } + let got = strategy.place(pod_identities.as_slice(), preferred_nodes.as_slice()); - #[rstest] - #[case::nothing_to_place(1, 1, 1, &[], &[])] - #[case::not_enough_nodes(1, 0, 0, &[None], &[None])] - #[case::place_one_pod(1, 0, 1, &[None], &[Some(NodeIdentity { name: "NODE_0".to_string() })])] - #[case::place_one_pod_on_preferred(1, 0, 5, &[Some(NodeIdentity { name: "NODE_2".to_string() })], &[Some(NodeIdentity { name: "NODE_2".to_string() })])] - #[case::place_three_pods(3, 0, 5, &[None, None, None], - &[Some(NodeIdentity { name: "NODE_0".to_string() }), - Some(NodeIdentity { name: "NODE_1".to_string() }), - Some(NodeIdentity { name: "NODE_0".to_string() })])] - #[case::place_three_pods_one_on_preferred(3, 0, 5, &[Some(NodeIdentity { name: "NODE_2".to_string() }), Some(NodeIdentity { name: "NODE_3".to_string() }), None], - &[Some(NodeIdentity { name: "NODE_2".to_string() }), - Some(NodeIdentity { name: "NODE_3".to_string() }), - Some(NodeIdentity { name: "NODE_0".to_string() })])] - fn test_scheduler_hashing_strategy( - #[case] wanted_pod_count: usize, - #[case] scheduled_pods_count: usize, - #[case] available_node_count: usize, - #[case] preferred_nodes: &[Option], - #[case] expected: &[Option], - ) { - let wanted_pods = generate_ids(wanted_pod_count); - let eligible_nodes = generate_eligible_nodes(available_node_count); - - let scheduled_pods: Vec<_> = wanted_pods - .iter() - .take(scheduled_pods_count) - .cloned() - .collect(); - let current_mapping = generate_current_mapping(&scheduled_pods, eligible_nodes.clone()); - - let vec_preferred_nodes: Vec> = - preferred_nodes.iter().map(|o| o.as_ref()).collect(); - let strategy = HashingStrategy::new(&eligible_nodes); - let got = strategy.place( - current_mapping.missing(wanted_pods.as_slice()).as_slice(), - vec_preferred_nodes.as_slice(), - ); assert_eq!(got, expected.to_vec()); } #[rstest] - #[case(1, None, "", "", None)] - #[case(0, Some(NodeIdentity{name: "NODE_2".to_string()}), "ROLE_0", "GROUP_0", None)] - #[case(3, Some(NodeIdentity{name: "NODE_2".to_string()}), "ROLE_1", "GROUP_1", Some(NodeIdentity{name: "NODE_1".to_string()}))] // node not found, use first! - #[case(4, Some(NodeIdentity{name: "NODE_2".to_string()}), "ROLE_0", "GROUP_0", Some(NodeIdentity{name: "NODE_2".to_string()}))] // node found, use it! - fn test_scheduler_preferred_node_or_last( - #[case] eligible_node_count: usize, - #[case] opt_node_id: Option, - #[case] role: &str, - #[case] group: &str, - #[case] expected: Option, - ) { - let eligible_nodes = generate_eligible_nodes(eligible_node_count); - let pod = PodIdentity { - role: role.to_string(), - group: group.to_string(), - ..PodIdentity::default() - }; - let got = eligible_nodes.preferred_node_or_last(&pod, opt_node_id.as_ref()); - - assert_eq!(got, expected); - } - - #[rstest] - #[case(0, 0)] - #[case(3, 3)] + #[case(0, vec![],)] + #[case(2, vec![ + ("role1", "group1", vec!["node_1", "node_2"]), + ("role1", "group2", vec!["node_1"]), + ],)] + #[case(4, vec![("role1", "group1", vec!["node_1", "node_2", "node_3", "node_4"]),],)] fn test_scheduler_count_unique_node_ids( - #[case] eligible_node_count: usize, #[case] expected: usize, + #[case] role_group_nodes: Vec<(&str, &str, Vec<&str>)>, ) { - let eligible_nodes = generate_eligible_nodes(eligible_node_count); - assert_eq!(expected, eligible_nodes.count_unique_node_ids()); + let nodes = build_role_group_nodes(role_group_nodes); + assert_eq!(expected, nodes.count_unique_node_ids()); } #[rstest] - #[case::no_missing_pods(1, 1, 1, vec![])] - #[case::missing_one_pod(1, 0, 1, vec![PodIdentity { app: "app".to_string(), instance: "simple".to_string(), role: "ROLE_0".to_string(), group: "GROUP_0".to_string(), id: "POD_0".to_string() }])] - fn test_scheduler_pod_to_node_mapping_missing( - #[case] wanted_pod_count: usize, - #[case] scheduled_pods_count: usize, - #[case] available_node_count: usize, - #[case] expected: Vec, + #[case(None, PodIdentity::default(), None, vec![])] + #[case::last( + Some(NodeIdentity::new("node_3")), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "role1", "group1", "1").unwrap(), + None, + vec![("role1", "group1", vec!["node_1", "node_2", "node_3"])])] + #[case::preferred( + Some(NodeIdentity::new("node_2")), + PodIdentity::new(identity::tests::APP_NAME, identity::tests::INSTANCE, "role1", "group1", "1").unwrap(), + Some(NodeIdentity::new("node_2")), + vec![("role1", "group1", vec!["node_1", "node_2", "node_3"])])] + fn test_scheduler_preferred_node_or_last( + #[case] expected: Option, + #[case] pod_id: PodIdentity, + #[case] preferred: Option, + #[case] role_group_nodes: Vec<(&str, &str, Vec<&str>)>, ) { - let wanted_pods = generate_ids(wanted_pod_count); - let available_nodes = generate_eligible_nodes(available_node_count); - let scheduled_pods: Vec<_> = wanted_pods - .iter() - .take(scheduled_pods_count) - .cloned() - .collect(); - - let mapping = generate_current_mapping(&scheduled_pods, available_nodes); - - let got = mapping.missing(wanted_pods.as_slice()); - let expected_refs: Vec<&PodIdentity> = expected.iter().collect(); - assert_eq!(got, expected_refs); + let nodes = build_role_group_nodes(role_group_nodes); + let got = nodes.preferred_node_or_last(&pod_id, preferred); + assert_eq!(got, expected); } - #[rstest] - #[case::one_pod_is_scheduled(1, 1, - Ok(SchedulerState { - current_mapping: PodToNodeMapping::default(), - remaining_mapping: - PodToNodeMapping::new(vec![ - (PodIdentity { app: "app".to_string(), instance: "simple".to_string(), role: "ROLE_0".to_string(), group: "GROUP_0".to_string(), id: "POD_0".to_string() }, NodeIdentity { name: "NODE_0".to_string() }), - ])}, - ))] - #[case::pod_cannot_be_scheduled(1, 0, - Err(Error::NotEnoughNodesAvailable { - number_of_nodes: 0, - number_of_pods: 1, - unscheduled_pods: vec![ - PodIdentity { - app: "app".to_string(), - instance: "simple".to_string(), - role: "ROLE_0".to_string(), - group: "GROUP_0".to_string(), - id: "POD_0".to_string() }] }))] - fn test_scheduler_update_history_and_result( - #[case] pod_count: usize, - #[case] node_count: usize, - #[case] expected: SchedulerResult, - ) { - let pods = generate_ids(pod_count); - let nodes = (0..pod_count) - .map(|i| { - if i < node_count { - Some(NodeIdentity { - name: format!("NODE_{}", i), - }) - } else { - None - } - }) - .collect(); - let current_mapping = PodToNodeMapping::default(); - let mut history = TestHistory::default(); - - let mut scheduler = StickyScheduler::new(&mut history, ScheduleStrategy::GroupAntiAffinity); - - let got = scheduler.update_history_and_result( - pods.iter().collect::>(), - nodes, - pod_count, - node_count, - ¤t_mapping, - ); - - assert_eq!(got, expected); + fn build_role_group_nodes( + eligible_nodes: Vec<(&str, &str, Vec<&str>)>, + ) -> RoleGroupEligibleNodes { + let mut node_set: BTreeMap>> = BTreeMap::new(); + for (role, group, node_names) in eligible_nodes { + node_set + .entry(String::from(role)) + .and_modify(|r| { + r.insert( + String::from(group), + node_names + .iter() + .map(|nn| NodeIdentity { + name: nn.to_string(), + }) + .collect(), + ); + }) + .or_insert_with(|| { + vec![( + String::from(group), + node_names + .iter() + .map(|nn| NodeIdentity { + name: nn.to_string(), + }) + .collect(), + )] + .into_iter() + .collect() + }); + } + RoleGroupEligibleNodes { node_set } } }