diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs index e3a1ef513..968e9485b 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs @@ -61,6 +61,11 @@ impl GetSuitablePools { move_repl, } } + + /// Get the volume spec. + pub(crate) fn spec(&self) -> &VolumeSpec { + &self.spec + } } /// The context to select suitable pools for volume replica creation. diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/mod.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/mod.rs index 91ffb57dd..58a8b308f 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/mod.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/mod.rs @@ -68,12 +68,15 @@ impl DefaultBasePolicy { } } -/// Return true if all the keys present in volume's pool/node inclusion matches with the pool/node -/// labels otherwise returns false. +/// Return true if all the keys present in the volume's pool/node +/// inclusion/affinity match the pool/node labels; otherwise, return false. pub(crate) fn qualifies_label_criteria( vol_pool_inc_labels: HashMap, + vol_pool_affinity: HashMap, pool_labels: &HashMap, ) -> bool { + let mut satisfy_inclusion = true; + let mut satisfy_affinity = true; for (vol_inc_key, vol_inc_value) in vol_pool_inc_labels.iter() { match pool_labels.get(vol_inc_key) { Some(pool_val) => { @@ -81,13 +84,27 @@ pub(crate) fn qualifies_label_criteria( continue; } if pool_val != vol_inc_value { - return false; + satisfy_inclusion = false; + break; } } None => { - return false; + satisfy_inclusion = false; + break; } } } - true + + for (vol_affinity_key, _) in vol_pool_affinity.iter() { + match pool_labels.get(vol_affinity_key) { + Some(_) => { + continue; + } + None => { + satisfy_affinity = false; + break; + } + } + } + satisfy_inclusion && satisfy_affinity } diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/node.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/node.rs index 857e85396..7e948da1c 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/node.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/node.rs @@ -74,6 +74,7 @@ impl NodeFilters { pub(crate) fn topology(request: &GetSuitablePoolsContext, item: &PoolItem) -> bool { let volume_node_topology_inclusion_labels: HashMap; let volume_node_topology_exclusion_labels: HashMap; + let volume_pool_afffinty: HashMap; match &request.topology { None => return true, Some(topology) => match &topology.node { @@ -84,11 +85,13 @@ impl NodeFilters { // present, otherwise selection of any pool is allowed. if !labelled_topology.inclusion.is_empty() || !labelled_topology.exclusion.is_empty() + || !labelled_topology.affinity.is_empty() { volume_node_topology_inclusion_labels = labelled_topology.inclusion.clone(); volume_node_topology_exclusion_labels = labelled_topology.exclusion.clone(); + volume_pool_afffinty = labelled_topology.affinity.clone(); } else { return true; } @@ -101,11 +104,15 @@ impl NodeFilters { // We will reach this part of code only if the volume has inclusion/exclusion labels. match request.registry().specs().node(&item.pool.node) { Ok(spec) => { - qualifies_label_criteria(volume_node_topology_inclusion_labels, spec.labels()) - && qualifies_label_criteria( - volume_node_topology_exclusion_labels, - spec.labels(), - ) + qualifies_label_criteria( + volume_node_topology_inclusion_labels, + volume_pool_afffinty.clone(), + spec.labels(), + ) && qualifies_label_criteria( + volume_node_topology_exclusion_labels, + volume_pool_afffinty, + spec.labels(), + ) } Err(_) => false, } diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/pool.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/pool.rs index 6dd80ef93..2882f6ed3 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/pool.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/pool.rs @@ -78,6 +78,7 @@ impl PoolBaseFilters { /// Should only attempt to use pools having specific creation label if topology has it. pub(crate) fn topology(request: &GetSuitablePoolsContext, item: &PoolItem) -> bool { let volume_pool_topology_inclusion_labels: HashMap; + let volume_pool_afffinty: HashMap; match request.topology.clone() { None => return true, Some(topology) => match topology.pool { @@ -86,8 +87,11 @@ impl PoolBaseFilters { PoolTopology::Labelled(labelled_topology) => { // The labels in Volume Pool Topology should match the pool labels if // present, otherwise selection of any pool is allowed. - if !labelled_topology.inclusion.is_empty() { - volume_pool_topology_inclusion_labels = labelled_topology.inclusion + if !labelled_topology.inclusion.is_empty() + || !labelled_topology.affinity.is_empty() + { + volume_pool_topology_inclusion_labels = labelled_topology.inclusion; + volume_pool_afffinty = labelled_topology.affinity; } else { return true; } @@ -100,9 +104,11 @@ impl PoolBaseFilters { match request.registry().specs().pool(&item.pool.id) { Ok(spec) => match spec.labels { None => false, - Some(pool_labels) => { - qualifies_label_criteria(volume_pool_topology_inclusion_labels, &pool_labels) - } + Some(pool_labels) => qualifies_label_criteria( + volume_pool_topology_inclusion_labels, + volume_pool_afffinty, + &pool_labels, + ), }, Err(_) => false, } diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/simple.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/simple.rs index e58a91b0c..1ce73e19f 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/simple.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume_policy/simple.rs @@ -281,6 +281,7 @@ mod tests { .cycle() .take(replicas) .collect::>(), + None, ); PoolItem::new(node_state, pool, ag_replica_count) } diff --git a/control-plane/agents/src/bin/core/node/wrapper.rs b/control-plane/agents/src/bin/core/node/wrapper.rs index 2ee2ad9f6..207c8d0c0 100644 --- a/control-plane/agents/src/bin/core/node/wrapper.rs +++ b/control-plane/agents/src/bin/core/node/wrapper.rs @@ -526,7 +526,7 @@ impl NodeWrapper { } }) .collect::>(); - PoolWrapper::new(pool_state.pool, replicas) + PoolWrapper::new(pool_state.pool, None, replicas) }) .collect() } @@ -554,7 +554,7 @@ impl NodeWrapper { } }) .collect(); - Some(PoolWrapper::new(pool_state.pool, replicas)) + Some(PoolWrapper::new(pool_state.pool, None, replicas)) } None => None, } diff --git a/control-plane/agents/src/bin/core/pool/wrapper.rs b/control-plane/agents/src/bin/core/pool/wrapper.rs index 06c46ea32..6c2d6c087 100644 --- a/control-plane/agents/src/bin/core/pool/wrapper.rs +++ b/control-plane/agents/src/bin/core/pool/wrapper.rs @@ -3,7 +3,7 @@ use stor_port::types::v0::{ transport::{CtrlPoolState, PoolState, PoolStatus, Protocol, Replica, ReplicaId}, }; -use std::{cmp::Ordering, ops::Deref}; +use std::{cmp::Ordering, collections::HashMap, ops::Deref}; /// Wrapper over a `Pool` state and all the replicas /// with Ord traits to aid pool selection for volume replicas (legacy). @@ -11,6 +11,7 @@ use std::{cmp::Ordering, ops::Deref}; pub(crate) struct PoolWrapper { state: PoolState, replicas: Vec, + labels: Option>, /// The accrued size/capacity of all replicas which means the pool usage could grow up to this /// size if < pool capacity. If this size is > pool capacity, then we can say the pool is /// overcommited by the size difference. @@ -27,7 +28,11 @@ impl Deref for PoolWrapper { impl PoolWrapper { /// New Pool wrapper with the pool and replicas. - pub(crate) fn new(mut pool: PoolState, replicas: Vec) -> Self { + pub(crate) fn new( + mut pool: PoolState, + labels: Option>, + replicas: Vec, + ) -> Self { let free_space = if pool.capacity >= pool.used { pool.capacity - pool.used } else { @@ -49,11 +54,16 @@ impl PoolWrapper { Self { state: pool, replicas, + labels, committed, free_space, } } + /// Get the labels. + pub(crate) fn labels(&self) -> Option> { + self.labels.clone() + } /// Get all the replicas. pub(crate) fn replicas(&self) -> &Vec { &self.replicas @@ -116,6 +126,11 @@ impl PoolWrapper { self.state.status = PoolStatus::Unknown; } + /// Set labels in PoolWrapper. + pub(crate) fn set_labels(&mut self, labels: Option>) { + self.labels = labels; + } + /// Add replica to list. #[allow(dead_code)] pub(crate) fn add_replica(&mut self, replica: &Replica) { diff --git a/control-plane/agents/src/bin/core/tests/deserializer.rs b/control-plane/agents/src/bin/core/tests/deserializer.rs index bcf4e8871..22bd58a06 100644 --- a/control-plane/agents/src/bin/core/tests/deserializer.rs +++ b/control-plane/agents/src/bin/core/tests/deserializer.rs @@ -112,6 +112,7 @@ fn test_deserialization_v1_to_v2() { ); labels }, + affinity: HashMap::new(), })), }), sequencer: Default::default(), diff --git a/control-plane/agents/src/bin/core/volume/clone_operations.rs b/control-plane/agents/src/bin/core/volume/clone_operations.rs index 828888830..406a51c26 100644 --- a/control-plane/agents/src/bin/core/volume/clone_operations.rs +++ b/control-plane/agents/src/bin/core/volume/clone_operations.rs @@ -4,7 +4,7 @@ use crate::{ resources::{ operations::{ResourceCloning, ResourceLifecycle, ResourceLifecycleExt}, operations_helper::{GuardedOperationsHelper, SpecOperationsHelper}, - OperationGuardArc, TraceStrLog, + OperationGuardArc, ResourceUid, TraceStrLog, }, scheduling::{volume::CloneVolumeSnapshot, ResourceFilter}, }, @@ -119,6 +119,7 @@ impl CreateVolumeExeVal for SnapshotCloneOp<'_> { #[async_trait::async_trait] impl CreateVolumeExe for SnapshotCloneOp<'_> { type Candidates = Vec; + type Replicas = Vec; async fn setup<'a>(&'a self, context: &mut Context<'a>) -> Result { let clonable_snapshots = self.cloneable_snapshot(context).await?; @@ -137,7 +138,7 @@ impl CreateVolumeExe for SnapshotCloneOp<'_> { &'a self, context: &mut Context<'a>, clone_replicas: Self::Candidates, - ) -> Vec { + ) -> Result, SvcError> { let mut replicas = Vec::new(); let volume_replicas = context.volume.as_ref().num_replicas as usize; // todo: need to add new replica and do full rebuild, if clonable snapshots @@ -161,11 +162,32 @@ impl CreateVolumeExe for SnapshotCloneOp<'_> { } } } - replicas + // we can't fulfil the required replication factor, so let the caller + // decide what to do next + if replicas.len() < context.volume.as_ref().num_replicas as usize { + match self.undo(context, &replicas).await { + Ok(_) => {} + Err(_error) => { + return Err(SvcError::ReplicaCreateNumber { + id: context.volume.uid_str(), + }); + } + } + Err(SvcError::ReplicaCreateNumber { + id: context.volume.uid_str(), + }) + } else { + Ok(replicas) + } } - async fn undo<'a>(&'a self, _context: &mut Context<'a>, _replicas: Vec) { + async fn undo<'a>( + &'a self, + _context: &mut Context<'a>, + _replicas: &Vec, + ) -> Result<(), SvcError> { // nothing to undo since we only support 1-replica snapshot + Ok(()) } } diff --git a/control-plane/agents/src/bin/core/volume/operations.rs b/control-plane/agents/src/bin/core/volume/operations.rs index e430f5b12..aaf9c89b6 100644 --- a/control-plane/agents/src/bin/core/volume/operations.rs +++ b/control-plane/agents/src/bin/core/volume/operations.rs @@ -44,8 +44,7 @@ use stor_port::{ }, }; -use std::{fmt::Debug, ops::Deref}; - +use std::{collections::HashMap, fmt::Debug, ops::Deref}; #[async_trait::async_trait] impl ResourceLifecycle for OperationGuardArc { type Create = CreateVolume; @@ -762,97 +761,6 @@ impl ResourceShutdownOperations for OperationGuardArc { } } -#[async_trait::async_trait] -impl ResourceLifecycleExt for OperationGuardArc { - type CreateOutput = Self; - - async fn create_ext( - registry: &Registry, - request: &CreateVolume, - ) -> Result { - let specs = registry.specs(); - let mut volume = specs - .get_or_create_volume(&CreateVolumeSource::None(request))? - .operation_guard_wait() - .await?; - let volume_clone = volume.start_create(registry, request).await?; - - // If the volume is a part of the ag, create or update accordingly. - registry.specs().get_or_create_affinity_group(&volume_clone); - - // todo: pick nodes and pools using the Node&Pool Topology - // todo: virtually increase the pool usage to avoid a race for space with concurrent calls - let result = create_volume_replicas(registry, request, &volume_clone).await; - let create_replica_candidate = volume - .validate_create_step_ext(registry, result, OnCreateFail::Delete) - .await?; - - let mut replicas = Vec::::new(); - for replica in create_replica_candidate.candidates() { - if replicas.len() >= request.replicas as usize { - break; - } else if replicas.iter().any(|r| r.node == replica.node) { - // don't reuse the same node - continue; - } - let replica = if replicas.is_empty() { - let mut replica = replica.clone(); - // the local replica needs to be connected via "bdev:///" - replica.share = Protocol::None; - replica - } else { - replica.clone() - }; - match OperationGuardArc::::create(registry, &replica).await { - Ok(replica) => { - replicas.push(replica); - } - Err(error) => { - volume_clone.error(&format!( - "Failed to create replica {:?} for volume, error: {}", - replica, - error.full_string() - )); - // continue trying... - } - }; - } - - // we can't fulfil the required replication factor, so let the caller - // decide what to do next - let result = if replicas.len() < request.replicas as usize { - for replica_state in replicas { - let result = match specs.replica(&replica_state.uuid).await { - Ok(mut replica) => { - let request = DestroyReplica::from(replica_state.clone()); - replica.destroy(registry, &request.with_disown_all()).await - } - Err(error) => Err(error), - }; - if let Err(error) = result { - volume_clone.error(&format!( - "Failed to delete replica {:?} from volume, error: {}", - replica_state, - error.full_string() - )); - } - } - Err(SvcError::ReplicaCreateNumber { - id: request.uuid.to_string(), - }) - } else { - Ok(()) - }; - - // we can destroy volume on error because there's no volume resource created on the nodes, - // only sub-resources (such as nexuses/replicas which will be garbage-collected later). - volume - .complete_create(result, registry, OnCreateFail::Delete) - .await?; - Ok(volume) - } -} - #[async_trait::async_trait] impl ResourceLifecycleExt> for OperationGuardArc { type CreateOutput = Self; @@ -880,7 +788,10 @@ impl ResourceLifecycleExt> for OperationGuardArc params.run(context).await, - CreateVolumeSource::Snapshot(params) => params.run(context).await, + CreateVolumeSource::Snapshot(params) => { + let replicas = params.run(context).await?; + Ok(ReplicaWithLabels::new(replicas, None)) + } }; // we can destroy volume on error because there's no volume resource created on the nodes, @@ -920,38 +831,32 @@ pub(super) struct Context<'a> { pub(super) trait CreateVolumeExeVal: Sync + Send { fn pre_flight_check(&self) -> Result<(), SvcError>; } - /// Trait that abstracts away the process of creating volume replicas. #[async_trait::async_trait] pub(super) trait CreateVolumeExe: CreateVolumeExeVal { type Candidates: Send + Sync; + type Replicas: Debug + Send + Sync; - async fn run<'a>(&'a self, mut context: Context<'a>) -> Result, SvcError> { + async fn run<'a>(&'a self, mut context: Context<'a>) -> Result { let result = self.setup(&mut context).await; let candidates = context .volume .validate_create_step_ext(context.registry, result, OnCreateFail::Delete) .await?; - let replicas = self.create(&mut context, candidates).await; - - // we can't fulfil the required replication factor, so let the caller - // decide what to do next - if replicas.len() < context.volume.as_ref().num_replicas as usize { - self.undo(&mut context, replicas).await; - Err(SvcError::ReplicaCreateNumber { - id: context.volume.uid_str(), - }) - } else { - Ok(replicas) - } + let replicas = self.create(&mut context, candidates).await?; + Ok(replicas) } async fn setup<'a>(&'a self, context: &mut Context<'a>) -> Result; async fn create<'a>( &'a self, context: &mut Context<'a>, candidates: Self::Candidates, - ) -> Vec; - async fn undo<'a>(&'a self, context: &mut Context<'a>, replicas: Vec); + ) -> Result; + async fn undo<'a>( + &'a self, + context: &mut Context<'a>, + replicas: &Vec, + ) -> Result<(), SvcError>; } impl CreateVolumeExeVal for CreateVolume { @@ -964,15 +869,34 @@ impl CreateVolumeExeVal for CreateVolume { } } +/// ReplicaWithLabels is a tuple of replicas and labels. +#[derive(Debug, Clone)] +pub(super) struct ReplicaWithLabels { + replicas: Vec, + #[allow(dead_code)] + labels: Option>, +} + +/// ReplicaWithLabels implementation. +impl ReplicaWithLabels { + /// Create a new ReplicaWithLabels. + pub fn new(replicas: Vec, labels: Option>) -> Self { + Self { replicas, labels } + } + + /// Get the replicas. + #[allow(dead_code)] + pub fn replicas(&self) -> Vec { + self.replicas.clone() + } +} + #[async_trait::async_trait] impl CreateVolumeExe for CreateVolume { - type Candidates = CreateReplicaCandidate; + type Candidates = Vec; + type Replicas = ReplicaWithLabels; - async fn setup<'a>( - &'a self, - context: &mut Context<'a>, - ) -> Result { - // todo: pick nodes and pools using the Node&Pool Topology + async fn setup<'a>(&'a self, context: &mut Context<'a>) -> Result { // todo: virtually increase the pool usage to avoid a race for space with concurrent calls create_volume_replicas(context.registry, self, context.volume.as_ref()).await } @@ -980,70 +904,100 @@ impl CreateVolumeExe for CreateVolume { async fn create<'a>( &'a self, context: &mut Context<'a>, - candidates: CreateReplicaCandidate, - ) -> Vec { - let mut replicas = Vec::::with_capacity(candidates.candidates().len()); - for replica in candidates.candidates() { - if replicas.len() >= self.replicas as usize { - break; - } else if replicas.iter().any(|r| { - r.node == replica.node - || spread_label_is_same(r, replica, context).unwrap_or_else(|error| { + candidates: Self::Candidates, + ) -> Result { + // TODO : remove any existing replicas of volume in case of affinity + // TODO : update volume spec with affinity key + + for create_replica_candidate in candidates.iter() { + let mut replicas = + Vec::::with_capacity(create_replica_candidate.candidates().len()); + + for replica in create_replica_candidate.candidates() { + if replicas.len() >= self.replicas as usize { + break; + } else if replicas.iter().any(|r| { + r.node == replica.node + || spread_label_is_same(r, replica, context).unwrap_or_else(|error| { + context.volume.error(&format!( + "Failed to create replica {:?} for volume, error: {}", + replica, + error.full_string() + )); + false + }) + }) { + // don't re-use the same node or same exclusion labels + continue; + } + let replica = if replicas.is_empty() { + let mut replica = replica.clone(); + // the local replica needs to be connected via "bdev:///" + replica.share = Protocol::None; + replica + } else { + replica.clone() + }; + match OperationGuardArc::::create(context.registry, &replica).await { + Ok(replica) => { + replicas.push(replica); + } + Err(error) => { context.volume.error(&format!( "Failed to create replica {:?} for volume, error: {}", replica, error.full_string() )); - false - }) - }) { - // don't re-use the same node or same exclusion labels - continue; + // continue trying... + } + }; } - let replica = if replicas.is_empty() { - let mut replica = replica.clone(); - // the local replica needs to be connected via "bdev:///" - replica.share = Protocol::None; - replica - } else { - replica.clone() - }; - match OperationGuardArc::::create(context.registry, &replica).await { - Ok(replica) => { - replicas.push(replica); - } - Err(error) => { - context.volume.error(&format!( - "Failed to create replica {:?} for volume, error: {}", - replica, - error.full_string() - )); - // continue trying... + + if replicas.len() < context.volume.as_ref().num_replicas as usize { + match self.undo(context, &replicas).await { + Ok(_) => {} + Err(_error) => { + return Err(SvcError::ReplicaCreateNumber { + id: context.volume.uid_str(), + }); + } } - }; + continue; + } else { + return Ok(ReplicaWithLabels::new( + replicas, + create_replica_candidate.labels().clone(), + )); + } } - replicas + return Err(SvcError::ReplicaCreateNumber { + id: context.volume.uid_str(), + }); } - async fn undo<'a>(&'a self, context: &mut Context<'a>, replicas: Vec) { + async fn undo<'a>( + &'a self, + context: &mut Context<'a>, + replicas: &Vec, + ) -> Result<(), SvcError> { + let mut result = Ok(()); for replica_state in replicas { - let result = match context.registry.specs().replica(&replica_state.uuid).await { + match context.registry.specs().replica(&replica_state.uuid).await { Ok(mut replica) => { let request = DestroyReplica::from(replica_state.clone()); - replica + if let Err(error) = replica .destroy(context.registry, &request.with_disown_all()) .await + { + result = Err(error); + } + } + Err(error) => { + result = Err(error); } - Err(error) => Err(error), - }; - if let Err(error) = result { - context.volume.error(&format!( - "Failed to delete replica {:?} from volume, error: {}", - replica_state, - error.full_string() - )); } } + result } } diff --git a/control-plane/agents/src/bin/core/volume/scheduling.rs b/control-plane/agents/src/bin/core/volume/scheduling.rs index ea904ffb0..8cafa75d5 100644 --- a/control-plane/agents/src/bin/core/volume/scheduling.rs +++ b/control-plane/agents/src/bin/core/volume/scheduling.rs @@ -14,12 +14,40 @@ use crate::{ nexus::scheduling::target_node_candidates, node::wrapper::NodeWrapper, }; + use agents::errors::{NotEnough, SvcError}; use stor_port::types::v0::{ store::{nexus::NexusSpec, volume::VolumeSpec}, - transport::{NodeId, Replica, VolumeState}, + transport::{NodeId, NodeTopology, PoolTopology, Replica, VolumeState}, }; +use std::collections::HashMap; + +/// Return a list of pre sorted pools to be used by a volume at time of creation. +pub(crate) async fn volume_pool_candidates_new( + request: impl Into, + registry: &Registry, +) -> Vec> { + let mut result: Vec> = Vec::new(); + let request = request.into(); + let is_affinity = is_affinity_requested(request.clone()); + + let vec_of_pool_wrappers: Vec = + volume::AddVolumeReplica::builder_with_defaults(request.clone(), registry) + .await + .collect() + .into_iter() + .map(|e| e.collect()) + .collect(); + + if is_affinity { + result = group_pools_by_affinity(vec_of_pool_wrappers, request, registry); + } else { + result.push(vec_of_pool_wrappers); + } + result +} + /// Return a list of pre sorted pools to be used by a volume. pub(crate) async fn volume_pool_candidates( request: impl Into, @@ -143,3 +171,95 @@ pub(crate) async fn resizeable_replicas( .map(|item| item.state().clone()) .collect() } + +/// Return true if affinity is requested in the volume spec. +pub(crate) fn is_affinity_requested(suitable_pools: GetSuitablePools) -> bool { + match suitable_pools.spec().topology.as_ref() { + Some(topology) => { + let node_affinity = match &topology.node { + Some(node_topology) => match node_topology { + NodeTopology::Labelled(label) => { + let mut matching_key = false; + for key in label.affinity.keys() { + if label.inclusion.contains_key(key) { + matching_key = true; + } + } + !matching_key && !label.affinity.is_empty() + } + + NodeTopology::Explicit(_) => false, + }, + None => false, + }; + let pool_affinity = match &topology.pool { + Some(pool_topology) => match pool_topology { + PoolTopology::Labelled(label) => { + let mut matching_key = false; + for key in label.affinity.keys() { + if label.inclusion.contains_key(key) { + matching_key = true; + } + } + !matching_key && !label.affinity.is_empty() + } + }, + None => false, + }; + node_affinity || pool_affinity + } + None => false, + } +} + +/// Return a list of pre sorted pools in case of affinity. +pub(crate) fn group_pools_by_affinity( + mut pools: Vec, + suitable_pools: GetSuitablePools, + registry: &Registry, +) -> Vec> { + let mut map: HashMap> = HashMap::new(); + for pool in pools.iter_mut() { + let id = pool.state().id.clone(); + let spec = registry.specs().pool(&id).unwrap_or_default(); + let pool_labels = spec.labels.clone().unwrap_or_default(); + + // Get the affinity labels from the pool topology + let pool_affinity = match suitable_pools + .spec() + .topology + .as_ref() + .and_then(|topology| topology.pool.as_ref()) + { + Some(PoolTopology::Labelled(label)) => label.affinity.clone(), + _ => HashMap::new(), + }; + + // performance: gold and zone: us-west, then the map key is "performance:gold;zone:us-west;" + // and the values are the vec of pool wrappers that have these labels + for (pool_affinity_key, _) in pool_affinity.iter() { + let mut result = String::new(); + let mut labels = HashMap::new(); + if let Some(value) = pool_labels.get(pool_affinity_key) { + result.push_str(pool_affinity_key); + result.push_str(":"); + result.push_str(value); + result.push_str(";"); + labels.insert(pool_affinity_key.clone(), value.clone()); + } + pool.set_labels(Some(labels.clone())); + if let Some(pools) = map.get_mut(&result) { + pools.push(pool.clone()); + } else { + map.insert(result, vec![pool.clone()]); + } + } + } + + let mut final_list_of_pools: Vec> = map.into_values().collect(); + + // Sort vectors by their lengths in descending order + final_list_of_pools.sort_by_key(|b| std::cmp::Reverse(b.len())); + + final_list_of_pools +} diff --git a/control-plane/agents/src/bin/core/volume/specs.rs b/control-plane/agents/src/bin/core/volume/specs.rs index 3f4ad14ba..81cdbea01 100644 --- a/control-plane/agents/src/bin/core/volume/specs.rs +++ b/control-plane/agents/src/bin/core/volume/specs.rs @@ -49,12 +49,13 @@ use stor_port::{ }; use snafu::OptionExt; -use std::convert::From; +use std::{collections::HashMap, convert::From}; /// CreateReplicaCandidate for volume and Affinity Group. pub(crate) struct CreateReplicaCandidate { candidates: Vec, _affinity_group_guard: Option>, + labels: Option>, } impl CreateReplicaCandidate { @@ -62,16 +63,23 @@ impl CreateReplicaCandidate { pub(crate) fn new( candidates: Vec, affinity_group_guard: Option>, + labels: Option>, ) -> CreateReplicaCandidate { Self { candidates, _affinity_group_guard: affinity_group_guard, + labels, } } /// Get the candidates. pub(crate) fn candidates(&self) -> &Vec { &self.candidates } + + /// Get the labels. + pub(crate) fn labels(&self) -> Option> { + self.labels.clone() + } } /// NexusNodeCandidate for nexus node selection. @@ -173,6 +181,84 @@ pub(crate) async fn nexus_attach_candidates( Ok(candidates) } +pub(crate) struct CreateReplicas { + replicas: Vec, + affinity_labels: Option>, +} + +impl CreateReplicas { + /// Create a new `CreateReplicas` with replicas and optional labels. + pub fn new( + replicas: Vec, + affinity_labels: Option>, + ) -> Self { + Self { + replicas, + affinity_labels, + } + } + /// Get the replicas. + pub fn replicas(&self) -> &Vec { + &self.replicas + } + + /// Get the labels. + pub fn labels(&self) -> Option> { + self.affinity_labels.clone() + } +} + +/// Return a list of appropriate requests which can be used to create a replica on a pool. +/// This can be used when the volume's current replica count is smaller than the desired volume's +/// replica count. +pub(crate) async fn volume_replica_candidates_new( + registry: &Registry, + volume_spec: &VolumeSpec, +) -> Result, SvcError> { + let request = GetSuitablePools::new(volume_spec, None); + let mut vec_create_replicas: Vec = Vec::new(); + let set_of_pools: Vec> = + scheduling::volume_pool_candidates_new(request.clone(), registry).await; + + for pools in set_of_pools.iter() { + if pools.is_empty() { + return Err(SvcError::NotEnoughResources { + source: NotEnough::OfPools { have: 0, need: 1 }, + }); + } + volume_spec.trace(&format!( + "Creation pool candidates for volume: {:?}", + pools.iter().map(|p| p.state()).collect::>() + )); + let mut create_replicas: Vec = Vec::new(); + let mut labels = HashMap::new(); + for pool_wrapper in pools.iter() { + let replica_uuid = ReplicaId::new(); + let create_replica = CreateReplica { + node: pool_wrapper.node.clone(), + name: Some(ReplicaName::new(&replica_uuid, Some(&request.uuid))), + uuid: replica_uuid, + entity_id: Some(volume_spec.uuid.clone()), + pool_id: pool_wrapper.id.clone(), + pool_uuid: None, + size: request.size, + thin: request.as_thin(), + share: Protocol::None, + managed: true, + owners: ReplicaOwners::from_volume(&request.uuid), + allowed_hosts: vec![], + kind: None, + }; + create_replicas.push(create_replica); + if let Some(pool_labels) = pool_wrapper.labels() { + labels.extend(pool_labels); + } + } + vec_create_replicas.push(CreateReplicas::new(create_replicas, Some(labels))); + } + Ok(vec_create_replicas) +} + /// Return a list of appropriate requests which can be used to create a replica on a pool. /// This can be used when the volume's current replica count is smaller than the desired volume's /// replica count. @@ -271,13 +357,7 @@ pub(crate) async fn create_volume_replicas( registry: &Registry, request: &CreateVolume, volume: &VolumeSpec, -) -> Result { - // Create a ag guard to prevent candidate collision. - let ag_guard = match registry.specs().get_or_create_affinity_group(volume) { - Some(ag) => Some(ag.operation_guard_wait().await?), - _ => None, - }; - +) -> Result, SvcError> { if !request.allowed_nodes().is_empty() && request.replicas > request.allowed_nodes().len() as u64 { @@ -285,16 +365,40 @@ pub(crate) async fn create_volume_replicas( return Err(SvcError::InvalidArguments {}); } - let node_replicas = volume_replica_candidates(registry, volume).await?; + let mut vec_create_replica_candidate: Vec = Vec::new(); + + let node_replicas_vec: Vec = + volume_replica_candidates_new(registry, volume).await?; - if request.replicas > node_replicas.len() as u64 { - Err(SvcError::from(NotEnough::OfPools { - have: node_replicas.len() as u64, + // Error out if their are no suitable pools for the volume + if node_replicas_vec.is_empty() { + return Err(SvcError::from(NotEnough::OfPools { + have: 0, need: request.replicas, - })) - } else { - Ok(CreateReplicaCandidate::new(node_replicas, ag_guard)) + })); + } + // The first element of the node_replicas_vec contains the most suitable replicas + // Error out if the first element node_replicas_vec doesn't have enough replicas + if request.replicas > node_replicas_vec[0].replicas().len() as u64 { + return Err(SvcError::from(NotEnough::OfPools { + have: node_replicas_vec[0].replicas().len() as u64, + need: request.replicas, + })); + } + + for node_replicas in node_replicas_vec.iter() { + // Create a ag guard to prevent candidate collision. + let ag_guard = match registry.specs().get_or_create_affinity_group(volume) { + Some(ag) => Some(ag.operation_guard_wait().await?), + _ => None, + }; + vec_create_replica_candidate.push(CreateReplicaCandidate::new( + node_replicas.replicas().clone(), + ag_guard, + node_replicas.labels(), + )); } + Ok(vec_create_replica_candidate) } /// Get all usable healthy replicas for volume nexus creation. diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index b17e83e78..3587392f6 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -1075,6 +1075,7 @@ fn context_into_topology(context: &CreateParams) -> CreateVolumeTopology { let mut pool_inclusive_label_topology: HashMap = HashMap::new(); let mut node_inclusive_label_topology: HashMap = HashMap::new(); let mut node_exclusive_label_topology: HashMap = HashMap::new(); + let mut pool_affinity_label_topology: Vec = Vec::::new(); pool_inclusive_label_topology.insert(dsp_created_by_key(), String::from(DSP_OPERATOR)); pool_inclusive_label_topology.extend( context @@ -1090,6 +1091,13 @@ fn context_into_topology(context: &CreateParams) -> CreateVolumeTopology { .clone() .unwrap_or_default(), ); + pool_affinity_label_topology.extend( + context + .publish_params() + .pool_affinity_topology_key() + .clone() + .unwrap_or_default(), + ); node_inclusive_label_topology.extend( context .publish_params() @@ -1111,14 +1119,17 @@ fn context_into_topology(context: &CreateParams) -> CreateVolumeTopology { .clone() .unwrap_or_default(), ); + CreateVolumeTopology::new( Some(models::NodeTopology::labelled(LabelledTopology { exclusion: node_exclusive_label_topology, inclusion: node_inclusive_label_topology, + affinitykey: Default::default(), })), Some(PoolTopology::labelled(LabelledTopology { exclusion: Default::default(), inclusion: pool_inclusive_label_topology, + affinitykey: pool_affinity_label_topology, })), ) } diff --git a/control-plane/csi-driver/src/context.rs b/control-plane/csi-driver/src/context.rs index c023bc04b..e17edc3ee 100644 --- a/control-plane/csi-driver/src/context.rs +++ b/control-plane/csi-driver/src/context.rs @@ -58,6 +58,8 @@ pub enum Parameters { QuiesceFs, #[strum(serialize = "poolAffinityTopologyLabel")] PoolAffinityTopologyLabel, + #[strum(serialize = "poolAffinityTopologyKey")] + PoolAffinityTopologyKey, #[strum(serialize = "poolHasTopologyKey")] PoolHasTopologyKey, #[strum(serialize = "nodeAffinityTopologyLabel")] @@ -116,6 +118,27 @@ impl Parameters { }) } + fn parse_topology_param_vec( + value: Option<&String>, + ) -> Result>, tonic::Status> { + Ok(match value { + Some(labels) => { + let mut result_vec = Vec::new(); + for label in labels.split('\n') { + if !label.is_empty() { + result_vec.push(label.to_string()) + } else { + return Err(tonic::Status::invalid_argument(format!( + "Invalid label : {value:?}" + ))); + } + } + Some(result_vec) + } + None => None, + }) + } + fn parse_u32(value: Option<&String>) -> Result, ParseIntError> { Ok(match value { Some(value) => value.parse::().map(Some)?, @@ -176,6 +199,12 @@ impl Parameters { ) -> Result>, tonic::Status> { Self::parse_topology_param(value) } + /// Parse the value for `Self::PoolAffinityTopologyKey`. + pub fn pool_affinity_topology_key( + value: Option<&String>, + ) -> Result>, tonic::Status> { + Self::parse_topology_param_vec(value) + } /// Parse the value for `Self::PoolHasTopologyKey`. pub fn pool_has_topology_key( value: Option<&String>, @@ -217,6 +246,7 @@ pub struct PublishParams { fs_type: Option, fs_id: Option, pool_affinity_topology_label: Option>, + pool_affinity_topology_key: Option>, pool_has_topology_key: Option>, node_affinity_topology_label: Option>, node_has_topology_key: Option>, @@ -247,6 +277,10 @@ impl PublishParams { pub fn pool_affinity_topology_label(&self) -> &Option> { &self.pool_affinity_topology_label } + /// Get the `Parameters::PoolAffinityTopologyKey` value. + pub fn pool_affinity_topology_key(&self) -> &Option> { + &self.pool_affinity_topology_key + } /// Get the `Parameters::PoolHasTopologyKey` value. pub fn pool_has_topology_key(&self) -> &Option> { &self.pool_has_topology_key @@ -319,6 +353,11 @@ impl TryFrom<&HashMap> for PublishParams { ) .map_err(|_| tonic::Status::invalid_argument("Invalid pool_affinity_topology_label"))?; + let pool_affinity_topology_key = Parameters::pool_affinity_topology_key( + args.get(Parameters::PoolAffinityTopologyKey.as_ref()), + ) + .map_err(|_| tonic::Status::invalid_argument("Invalid pool_affinity_topology_key"))?; + let pool_has_topology_key = Parameters::pool_has_topology_key(args.get(Parameters::PoolHasTopologyKey.as_ref())) .map_err(|_| tonic::Status::invalid_argument("Invalid pool_has_topology_key"))?; @@ -356,6 +395,7 @@ impl TryFrom<&HashMap> for PublishParams { fs_type, fs_id, pool_affinity_topology_label, + pool_affinity_topology_key, pool_has_topology_key, node_affinity_topology_label, node_has_topology_key, diff --git a/control-plane/grpc/proto/v1/volume/volume.proto b/control-plane/grpc/proto/v1/volume/volume.proto index b7f2cb9f7..513c4d7cb 100644 --- a/control-plane/grpc/proto/v1/volume/volume.proto +++ b/control-plane/grpc/proto/v1/volume/volume.proto @@ -157,6 +157,8 @@ message LabelledTopology { common.StringMapValue exclusion = 1; // inclusive labels common.StringMapValue inclusion = 2; + // affinity labels + common.StringMapValue affinity = 3; } message ExplicitNodeTopology { diff --git a/control-plane/grpc/src/operations/volume/traits.rs b/control-plane/grpc/src/operations/volume/traits.rs index cba06fef1..aceb74329 100644 --- a/control-plane/grpc/src/operations/volume/traits.rs +++ b/control-plane/grpc/src/operations/volume/traits.rs @@ -642,6 +642,10 @@ impl From for LabelledTopology { Some(labels) => labels.value, None => HashMap::new(), }, + affinity: match labelled_topology_grpc_type.affinity { + Some(labels) => labels.value, + None => HashMap::new(), + }, } } } @@ -655,6 +659,9 @@ impl From for volume::LabelledTopology { inclusion: Some(crate::common::StringMapValue { value: topo.inclusion, }), + affinity: Some(crate::common::StringMapValue { + value: topo.affinity, + }), } } } diff --git a/control-plane/rest/openapi-specs/v0_api_spec.yaml b/control-plane/rest/openapi-specs/v0_api_spec.yaml index 663df79cc..33c49bdd2 100644 --- a/control-plane/rest/openapi-specs/v0_api_spec.yaml +++ b/control-plane/rest/openapi-specs/v0_api_spec.yaml @@ -2481,6 +2481,8 @@ components: - '' inclusion: - '' + affinitykey: + - '' description: labelled topology type: object properties: @@ -2510,9 +2512,22 @@ components: type: object additionalProperties: type: string + affinitykey: + example: '' + description: |- + This feature includes resources with identical $label keys. For example, + if the affinity key is set to "Zone": + Initially, a resource that matches the label is selected, example "Zone: A". + Subsequently, all other resources must match the given label "Zone: A", + effectively adding this requirement as an inclusion label. + type: array + items: + example: 'Zone' + type: string required: - exclusion - inclusion + - affinitykey Topology: description: node and pool topology for volumes type: object diff --git a/control-plane/stor-port/src/types/v0/transport/volume.rs b/control-plane/stor-port/src/types/v0/transport/volume.rs index a9fff9083..3a662f743 100644 --- a/control-plane/stor-port/src/types/v0/transport/volume.rs +++ b/control-plane/stor-port/src/types/v0/transport/volume.rs @@ -255,19 +255,31 @@ pub struct LabelledTopology { /// Inclusive labels. #[serde(default)] pub inclusion: HashMap, + /// Affinity labels. + #[serde(default)] + pub affinity: HashMap, } impl From for LabelledTopology { fn from(src: models::LabelledTopology) -> Self { + let mut affinity = HashMap::new(); + for affintity in src.affinitykey.iter() { + affinity.insert(affintity.to_string(), "".to_string()); + } Self { exclusion: src.exclusion, inclusion: src.inclusion, + affinity, } } } impl From for models::LabelledTopology { fn from(src: LabelledTopology) -> Self { - Self::new(src.exclusion, src.inclusion) + let mut affinity_key = vec![]; + for (key, _) in src.affinity.iter() { + affinity_key.push(key.clone()); + } + Self::new(src.exclusion, src.inclusion, affinity_key) } } diff --git a/tests/bdd/features/volume/topology/pool-topology.feature b/tests/bdd/features/volume/topology/pool-topology.feature index eaa0b92e8..f3d9754b6 100644 --- a/tests/bdd/features/volume/topology/pool-topology.feature +++ b/tests/bdd/features/volume/topology/pool-topology.feature @@ -14,25 +14,55 @@ Feature: Volume Pool Topology which signifies the volume pool topology inclusion labels as {"rack": "", "zone" : ""} Background: - Given a control plane, three Io-Engine instances, nine pools + Given a control plane, three Io-Engine instances, fourteen pools # The labels to be applied to the pools. ############################################################################################### -# Description || Pool Name || Label || Node || +# Description || Pool Name || Label || Node || #============================================================================================== -# "pool1" has || node1pool1 || zone-us=us-west-1 || io-engine-1 || -# the label || node2pool1 || zone-us=us-west-1 || io-engine-2 || -# "zone-us=us-west-1" || node3pool1 || zone-us=us-west-1 || io-engine-3 || +# "pool1" has || node1pool1 || zone-us=us-west-1 || io-engine-1 || +# the label || node2pool1 || zone-us=us-west-1 || io-engine-2 || +# "zone-us=us-west-1" || node3pool1 || zone-us=us-west-1 || io-engine-3 || #============================================================================================== -# "pool2" has || node1pool2 || zone-ap=ap-south-1 || io-engine-1 || -# the label || node2pool2 || zone-ap=ap-south-1 || io-engine-2 || -# "zone-ap=ap-south-1" || node3pool2 || zone-ap=ap-south-1 || io-engine-3 || +# "pool2" has || node1pool2 || zone-ap=ap-south-1 || io-engine-1 || +# the label || node2pool2 || zone-ap=ap-south-1 || io-engine-2 || +# "zone-ap=ap-south-1" || node3pool2 || zone-ap=ap-south-1 || io-engine-3 || #============================================================================================== -# "pool3" has || node1pool3 || zone-eu=eu-west-3 || io-engine-1 || -# the label || node2pool3 || zone-eu=eu-west-3 || io-engine-2 || -# "zone-eu=eu-west-3" || node3pool3 || zone-eu=eu-west-3 || io-engine-3 || +# "pool3" has || node1pool3 || zone-eu=eu-west-3 || io-engine-1 || +# the label || node2pool3 || zone-eu=eu-west-3 || io-engine-2 || +# "zone-eu=eu-west-3" || node3pool3 || zone-eu=eu-west-3 || io-engine-3 || #============================================================================================== +# "pool4" has || node1pool4 || zone-ca=ca-central-1 || io-engine-1 || +# the label || node2pool4 || zone-ca=ca-central-1 || io-engine-2 || +# "zone-ca=ca-central-1" || node3pool4 || zone-ca=ca-central-1 || io-engine-3 || +#============================================================================================== +# "pool5" has || node1pool5 || zone-ca=ca-west-1 || io-engine-1 || +# the label || node2pool5 || zone-ca=ca-west-1 || io-engine-2 || +# "zone-ca=ca-west-1" || || || || +#============================================================================================== + + Scenario Outline: Suitable pools which contain volume topology affinity key + Given a request for a replica volume with poolAffinityTopologyKey as and pool topology affinity as + When the desired number of replica of volume i.e. here; is number of the pools containing the label + Then the replica volume creation should and provisioned on pools with labels + Examples: + | pool_affinity_topology_key | volume_pool_topology_affinty | replica | expression | result | provisioned | pool_label | + | True | zone-us | 1 | <= | succeed | must be | zone-us=us-west-1 | + | True | zone-us | 2 | <= | succeed | must be | zone-us=us-west-1 | + | True | zone-us | 3 | <= | succeed | must be | zone-us=us-west-1 | + | True | zone-us | 4 | > | fail | not | zone-us=us-west-1 | + | True | zone-ap | 1 | <= | succeed | must be | zone-ap=ap-south-1 | + | True | zone-ap | 2 | <= | succeed | must be | zone-ap=ap-south-1 | + | True | zone-ap | 3 | <= | succeed | must be | zone-ap=ap-south-1 | + | True | zone-ap | 4 | > | fail | not | zone-ap=ap-south-1 | + | True | zone-eu | 1 | <= | succeed | must be | zone-eu=eu-west-3 | + | True | zone-eu | 2 | <= | succeed | must be | zone-eu=eu-west-3 | + | True | zone-eu | 3 | <= | succeed | must be | zone-eu=eu-west-3 | + | True | zone-eu | 4 | > | fail | not | zone-eu=eu-west-3 | + | True | zone-ca | 1 | <= | succeed | must be | zone-ca=ca-central-1 | + | True | zone-ca | 2 | <= | succeed | must be | zone-ca=ca-central-1 | + | True | zone-ca | 3 | <= | succeed | must be | zone-ca=ca-central-1 | Scenario Outline: Suitable pools which contain volume topology labels Given a request for a replica volume with poolAffinityTopologyLabel as and pool topology inclusion as diff --git a/tests/bdd/features/volume/topology/test_node_topology.py b/tests/bdd/features/volume/topology/test_node_topology.py index 345931576..41c904301 100644 --- a/tests/bdd/features/volume/topology/test_node_topology.py +++ b/tests/bdd/features/volume/topology/test_node_topology.py @@ -566,6 +566,7 @@ def create_volume_body( inclusion={ **inclusion_labels, }, + affinitykey=[], ) ) ) diff --git a/tests/bdd/features/volume/topology/test_pool_topology.py b/tests/bdd/features/volume/topology/test_pool_topology.py index 6c075000e..b3d4c4fad 100644 --- a/tests/bdd/features/volume/topology/test_pool_topology.py +++ b/tests/bdd/features/volume/topology/test_pool_topology.py @@ -5,13 +5,14 @@ import pytest import docker import requests -import time +import time from common import prod_domain_name, disk_pool_label_key, disk_pool_label_val from common.deployer import Deployer from common.apiclient import ApiClient from common.docker import Docker from common.nvme import nvme_connect, nvme_disconnect +from time import sleep from common.fio import Fio from common.operations import Cluster @@ -28,15 +29,28 @@ NODE_1_NAME = "io-engine-1" NODE_2_NAME = "io-engine-2" NODE_3_NAME = "io-engine-3" + +# The UUIDs of the pools on node1 NODE_1_POOL_1_UUID = "node1pool1" NODE_1_POOL_2_UUID = "node1pool2" NODE_1_POOL_3_UUID = "node1pool3" +NODE_1_POOL_4_UUID = "node1pool4" +NODE_1_POOL_5_UUID = "node1pool5" + +# The UUIDs of the pools on node2 NODE_2_POOL_1_UUID = "node2pool1" NODE_2_POOL_2_UUID = "node2pool2" NODE_2_POOL_3_UUID = "node2pool3" +NODE_2_POOL_4_UUID = "node2pool4" +NODE_2_POOL_5_UUID = "node2pool5" + +# The UUIDs of the pools on node3 NODE_3_POOL_1_UUID = "node3pool1" NODE_3_POOL_2_UUID = "node3pool2" NODE_3_POOL_3_UUID = "node3pool3" +NODE_3_POOL_4_UUID = "node3pool4" + +# The key used to pass the volume create request between test steps. CREATE_REQUEST_KEY = "create_request" VOLUME_SIZE = 10485761 VOLUME_UUID = "5cd5378e-3f05-47f1-a830-a0f5873a1441" @@ -46,21 +60,28 @@ # The labels to be applied to the pools. ############################################################################################### -# Description || Pool Name || Label || Node || -# ============================================================================================== -# "pool1" has || node1pool1 || zone-us=us-west-1 || io-engine-1 || -# the label || node2pool1 || zone-us=us-west-1 || io-engine-2 || -# "zone-us=us-west-1" || node3pool1 || zone-us=us-west-1 || io-engine-3 || -# ============================================================================================== -# "pool2" has || node1pool2 || zone-ap=ap-south-1 || io-engine-1 || -# the label || node2pool2 || zone-ap=ap-south-1 || io-engine-2 || -# "zone-ap=ap-south-1" || node3pool2 || zone-ap=ap-south-1 || io-engine-3 || -# ============================================================================================== -# "pool3" has || node1pool3 || zone-eu=eu-west-3 || io-engine-1 || -# the label || node2pool3 || zone-eu=eu-west-3 || io-engine-2 || -# "zone-eu=eu-west-3" || node3pool3 || zone-eu=eu-west-3 || io-engine-3 || -# ============================================================================================== -# ========================================================================================= +# Description || Pool Name || Label || Node || +#============================================================================================== +# "pool1" has || node1pool1 || zone-us=us-west-1 || io-engine-1 || +# the label || node2pool1 || zone-us=us-west-1 || io-engine-2 || +# "zone-us=us-west-1" || node3pool1 || zone-us=us-west-1 || io-engine-3 || +#============================================================================================== +# "pool2" has || node1pool2 || zone-ap=ap-south-1 || io-engine-1 || +# the label || node2pool2 || zone-ap=ap-south-1 || io-engine-2 || +# "zone-ap=ap-south-1" || node3pool2 || zone-ap=ap-south-1 || io-engine-3 || +#============================================================================================== +# "pool3" has || node1pool3 || zone-eu=eu-west-3 || io-engine-1 || +# the label || node2pool3 || zone-eu=eu-west-3 || io-engine-2 || +# "zone-eu=eu-west-3" || node3pool3 || zone-eu=eu-west-3 || io-engine-3 || +#============================================================================================== +# "pool4" has || node1pool4 || zone-ca=ca-central-1 || io-engine-1 || +# the label || node2pool4 || zone-ca=ca-central-1 || io-engine-2 || +# "zone-ca=ca-central-1" || node3pool4 || zone-ca=ca-central-1 || io-engine-3 || +#============================================================================================== +# "pool5" has || node1pool5 || zone-ca=ca-west-1 || io-engine-1 || +# the label || node2pool5 || zone-ca=ca-west-1 || io-engine-2 || +# "zone-ca=ca-west-1" || || || || +#============================================================================================== POOL_CONFIGURATIONS = [ # Pool node1pool1 has the label "zone-us=us-west-1" and is on node "io-engine-1" @@ -180,12 +201,79 @@ }, ), }, + # Pool node1pool4 has the label "zone-ca=ca-central-1" and is on node "io-engine-1" + { + "node_name": NODE_1_NAME, + "pool_uuid": NODE_1_POOL_4_UUID, + "pool_body": CreatePoolBody( + ["malloc:///disk4?size_mb=50"], + labels={ + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + "node": "io-engine-1", + "zone-ca": "ca-central-1", + }, + ), + }, + # Pool node2pool4 has the label "zone-ca=ca-central-1" and is on node "io-engine-2" + { + "node_name": NODE_2_NAME, + "pool_uuid": NODE_2_POOL_4_UUID, + "pool_body": CreatePoolBody( + ["malloc:///disk4?size_mb=50"], + labels={ + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + "node": "io-engine-2", + "zone-ca": "ca-central-1", + }, + ), + }, + # Pool node3pool4 has the label "zone-ca=ca-central-1" and is on node "io-engine-3" + { + "node_name": NODE_3_NAME, + "pool_uuid": NODE_3_POOL_4_UUID, + "pool_body": CreatePoolBody( + ["malloc:///disk4?size_mb=50"], + labels={ + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + "node": "io-engine-3", + "zone-ca": "ca-central-1", + }, + ), + }, + # Pool node1pool5 has the label "zone-ca=ca-west-1" and is on node "io-engine-1" + { + "node_name": NODE_1_NAME, + "pool_uuid": NODE_1_POOL_5_UUID, + "pool_body": CreatePoolBody( + ["malloc:///disk5?size_mb=50"], + labels={ + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + "node": "io-engine-1", + "zone-ca": "ca-west-1", + }, + ), + }, + # Pool node2pool5 has the label "zone-ca=ca-west-1" and is on node "io-engine-2" + { + "node_name": NODE_2_NAME, + "pool_uuid": NODE_2_POOL_5_UUID, + "pool_body": CreatePoolBody( + ["malloc:///disk5?size_mb=50"], + labels={ + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + "node": "io-engine-2", + "zone-ca": "ca-west-1", + }, + ), + }, + ] @pytest.fixture(scope="module") def init(): Deployer.start(NUM_IO_ENGINES, io_engine_coreisol=True) + # Create the pools. for config in POOL_CONFIGURATIONS: ApiClient.pools_api().put_node_pool( @@ -216,17 +304,20 @@ def replica_ctx(): def test_suitable_pools_which_contain_volume_topology_labels(): """Suitable pools which contain volume topology labels.""" - @scenario( "pool-topology.feature", "Suitable pools which contain volume topology keys only" ) def test_suitable_pools_which_contain_volume_topology_keys_only(): """Suitable pools which contain volume topology keys only.""" +@scenario('pool-topology.feature', 'Suitable pools which contain volume topology affinity key') +def test_suitable_pools_which_contain_volume_topology_affinity_key(): + """Suitable pools which contain volume topology affinity key.""" -@given("a control plane, three Io-Engine instances, nine pools") -def a_control_plane_three_ioengine_instances_nine_pools(init): - """a control plane, three Io-Engine instances, nine pools.""" + +@given("a control plane, three Io-Engine instances, fourteen pools") +def a_control_plane_three_ioengine_instances_fourteen_pools(init): + """a control plane, three Io-Engine instances, fourteen pools.""" docker_client = docker.from_env() # The control plane comprises the core agents, rest server and etcd instance. @@ -247,10 +338,23 @@ def a_control_plane_three_ioengine_instances_nine_pools(init): # Check for a pools pools = ApiClient.pools_api().get_pools() - assert len(pools) == 9 + assert len(pools) == 14 yield Cluster.cleanup(pools=False) +@given( + parsers.parse( + "a request for a {replica} replica volume with poolAffinityTopologyKey as {pool_affinity_topology_key} and pool topology affinity as {volume_pool_topology_affinty}" + ) +) +def a_request_for_a_replica_replica_volume_with_poolaffinitytopologykey_as_pool_affinity_topology_key_and_pool_topology_affinity_as_volume_pool_topology_affinty( + create_request, replica, pool_affinity_topology_key, volume_pool_topology_affinty +): + """a request for a replica volume with poolAffinityTopologyKey as and pool topology affinity as .""" + if pool_affinity_topology_key == "True": + request = create_volume_body(replica, volume_pool_topology_affinty, pool_affinity_topology_label="False", has_topology_key="False", pool_affinity_topology_key="True") + create_request[CREATE_REQUEST_KEY] = request + @given( parsers.parse( @@ -265,7 +369,7 @@ def a_request_for_a_replica_replica_volume_with_poolaffinitytopologylabel_as_poo ): """a request for a replica volume with poolAffinityTopologyLabel as and pool topology inclusion as .""" if pool_affinity_topology_label == "True": - request = create_volume_body(replica, volume_pool_topology_inclusion_label) + request = create_volume_body(replica, volume_pool_topology_inclusion_label, pool_affinity_topology_label="True", has_topology_key="False", pool_affinity_topology_key="False") create_request[CREATE_REQUEST_KEY] = request @@ -279,7 +383,7 @@ def a_request_for_a_replica_replica_volume_with_poolhastopologykey_as_has_topolo ): """a request for a replica volume with poolHasTopologyKey as and pool topology inclusion as .""" if has_topology_key == "True": - request = create_volume_body(replica, volume_pool_topology_inclusion_label) + request = create_volume_body(replica, volume_pool_topology_inclusion_label, pool_affinity_topology_label="False", has_topology_key="True", pool_affinity_topology_key="False") create_request[CREATE_REQUEST_KEY] = request @@ -296,6 +400,9 @@ def the_desired_number_of_replica_of_volume_ie_replica_here_is_expression_number create_request[CREATE_REQUEST_KEY]["topology"]["pool_topology"]["labelled"][ "inclusion" ], + create_request[CREATE_REQUEST_KEY]["topology"]["pool_topology"]["labelled"][ + "affinitykey" + ], ) if expression == "<=": assert int(replica) <= no_of_eligible_pools @@ -320,6 +427,7 @@ def the_replica_replica_volume_creation_should_result_and_provisioned_provisione replica, create_request[CREATE_REQUEST_KEY]["topology"] ) assert str(volume.spec) == str(expected_spec) + pools_names_which_has_given_labels = get_pool_names_with_given_labels( pool_label ) @@ -366,20 +474,32 @@ def get_pool_names_with_given_labels(pool_label): # Return the create volume request body based on the input parameters. -def create_volume_body(replica, volume_pool_topology_inclusion_label): +def create_volume_body(replica, volume_pool_topology_inclusion_label, pool_affinity_topology_label, has_topology_key, pool_affinity_topology_key ): """Create a volume body.""" - key, _, value = volume_pool_topology_inclusion_label.partition("=") - topology = Topology( - pool_topology=PoolTopology( - labelled=LabelledTopology( - exclusion={}, - inclusion={ - key.strip(): value.strip(), - DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, - }, + if pool_affinity_topology_label == "True" or has_topology_key == "True" : + key, _, value = volume_pool_topology_inclusion_label.partition("=") + topology = Topology( + pool_topology=PoolTopology( + labelled=LabelledTopology( + exclusion={}, + inclusion={ + key.strip(): value.strip(), + DISKPOOL_LABEL_KEY: DISKPOOL_LABEL_VAL, + }, + affinitykey=[], + ) + ) + ) + elif pool_affinity_topology_key == "True": + topology = Topology( + pool_topology=PoolTopology( + labelled=LabelledTopology( + exclusion={}, + inclusion={}, + affinitykey=[volume_pool_topology_inclusion_label], + ) ) ) - ) return CreateVolumeBody( VolumePolicy(False), int(replica), @@ -405,15 +525,22 @@ def expected_volume_spec(replica, toplogy): # Return the number of pools that qualify based on the volume topology inclusion labels. -def no_of_suitable_pools(volume_pool_topology_inclusion_labels): +def no_of_suitable_pools(volume_pool_topology_inclusion_labels, volume_pool_topology_affinity): """Return the number of pools that qualify based on the volume topology inclusion labels.""" pool_with_labels = get_pool_names_with_its_corresponding_labels() - qualified_pools = list() + qualified_pools = set() # Using HashSet to avoid duplicate entries. for pool_id, pool_labels in pool_with_labels.items(): - if does_pool_qualify_inclusion_labels( - volume_pool_topology_inclusion_labels, pool_labels - ): - qualified_pools.append(pool_id) + if len(volume_pool_topology_inclusion_labels) !=0 : + if does_pool_qualify_inclusion_labels( + volume_pool_topology_inclusion_labels, pool_labels + ): + qualified_pools.add(pool_id) + + if len(volume_pool_topology_affinity) != 0 : + if does_pool_qualify_affinity_keys( + volume_pool_topology_affinity, pool_labels + ): + qualified_pools.add(pool_id) return len(qualified_pools) @@ -443,3 +570,16 @@ def does_pool_qualify_inclusion_labels( else: inc_match = False return inc_match + +# Return whether the pool qualifies based on the volume topology affinity keys. +def does_pool_qualify_affinity_keys( + volume_pool_topology_affinit_key, pool_labels +): + inc_key_match = True + for key in volume_pool_topology_affinit_key: + if key in pool_labels: + inc_key_match = True + break + else: + inc_key_match = False + return inc_key_match \ No newline at end of file