Skip to content

Commit

Permalink
chore(bors): merge pull request #846
Browse files Browse the repository at this point in the history
846: feat(topology): add pool affinity key r=sinhaashish a=sinhaashish



Co-authored-by: sinhaashish <ashi.sinha.87@gmail.com>
  • Loading branch information
mayastor-bors and sinhaashish committed Sep 10, 2024
2 parents a2564c1 + 6acbc2a commit cd9bbe4
Show file tree
Hide file tree
Showing 23 changed files with 792 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ impl GetSuitablePools {
move_repl,
}
}

/// Get the volume spec.
pub(crate) fn spec(&self) -> &VolumeSpec {
&self.spec
}
}

/// The context to select suitable pools for volume replica creation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,43 @@ impl DefaultBasePolicy {
}
}

/// Return true if all the keys present in volume's pool/node inclusion matches with the pool/node
/// labels otherwise returns false.
/// Return true if all the keys present in the volume's pool/node
/// inclusion/affinity match the pool/node labels; otherwise, return false.
pub(crate) fn qualifies_label_criteria(
vol_pool_inc_labels: HashMap<String, String>,
vol_pool_affinity: HashMap<String, String>,
pool_labels: &HashMap<String, String>,
) -> bool {
let mut satisfy_inclusion = true;
let mut satisfy_affinity = true;
for (vol_inc_key, vol_inc_value) in vol_pool_inc_labels.iter() {
match pool_labels.get(vol_inc_key) {
Some(pool_val) => {
if vol_inc_value.is_empty() {
continue;
}
if pool_val != vol_inc_value {
return false;
satisfy_inclusion = false;
break;
}
}
None => {
return false;
satisfy_inclusion = false;
break;
}
}
}
true

for (vol_affinity_key, _) in vol_pool_affinity.iter() {
match pool_labels.get(vol_affinity_key) {
Some(_) => {
continue;
}
None => {
satisfy_affinity = false;
break;
}
}
}
satisfy_inclusion && satisfy_affinity
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl NodeFilters {
pub(crate) fn topology(request: &GetSuitablePoolsContext, item: &PoolItem) -> bool {
let volume_node_topology_inclusion_labels: HashMap<String, String>;
let volume_node_topology_exclusion_labels: HashMap<String, String>;
let volume_pool_afffinty: HashMap<String, String>;
match &request.topology {
None => return true,
Some(topology) => match &topology.node {
Expand All @@ -84,11 +85,13 @@ impl NodeFilters {
// present, otherwise selection of any pool is allowed.
if !labelled_topology.inclusion.is_empty()
|| !labelled_topology.exclusion.is_empty()
|| !labelled_topology.affinity.is_empty()
{
volume_node_topology_inclusion_labels =
labelled_topology.inclusion.clone();
volume_node_topology_exclusion_labels =
labelled_topology.exclusion.clone();
volume_pool_afffinty = labelled_topology.affinity.clone();
} else {
return true;
}
Expand All @@ -101,11 +104,15 @@ impl NodeFilters {
// We will reach this part of code only if the volume has inclusion/exclusion labels.
match request.registry().specs().node(&item.pool.node) {
Ok(spec) => {
qualifies_label_criteria(volume_node_topology_inclusion_labels, spec.labels())
&& qualifies_label_criteria(
volume_node_topology_exclusion_labels,
spec.labels(),
)
qualifies_label_criteria(
volume_node_topology_inclusion_labels,
volume_pool_afffinty.clone(),
spec.labels(),
) && qualifies_label_criteria(
volume_node_topology_exclusion_labels,
volume_pool_afffinty,
spec.labels(),
)
}
Err(_) => false,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl PoolBaseFilters {
/// Should only attempt to use pools having specific creation label if topology has it.
pub(crate) fn topology(request: &GetSuitablePoolsContext, item: &PoolItem) -> bool {
let volume_pool_topology_inclusion_labels: HashMap<String, String>;
let volume_pool_afffinty: HashMap<String, String>;
match request.topology.clone() {
None => return true,
Some(topology) => match topology.pool {
Expand All @@ -86,8 +87,11 @@ impl PoolBaseFilters {
PoolTopology::Labelled(labelled_topology) => {
// The labels in Volume Pool Topology should match the pool labels if
// present, otherwise selection of any pool is allowed.
if !labelled_topology.inclusion.is_empty() {
volume_pool_topology_inclusion_labels = labelled_topology.inclusion
if !labelled_topology.inclusion.is_empty()
|| !labelled_topology.affinity.is_empty()
{
volume_pool_topology_inclusion_labels = labelled_topology.inclusion;
volume_pool_afffinty = labelled_topology.affinity;
} else {
return true;
}
Expand All @@ -100,9 +104,11 @@ impl PoolBaseFilters {
match request.registry().specs().pool(&item.pool.id) {
Ok(spec) => match spec.labels {
None => false,
Some(pool_labels) => {
qualifies_label_criteria(volume_pool_topology_inclusion_labels, &pool_labels)
}
Some(pool_labels) => qualifies_label_criteria(
volume_pool_topology_inclusion_labels,
volume_pool_afffinty,
&pool_labels,
),
},
Err(_) => false,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ mod tests {
let replica = Replica::default();
let pool = PoolWrapper::new(
state,
None,
vec![replica]
.into_iter()
.cycle()
Expand Down
4 changes: 2 additions & 2 deletions control-plane/agents/src/bin/core/node/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ impl NodeWrapper {
}
})
.collect::<Vec<Replica>>();
PoolWrapper::new(pool_state.pool, replicas)
PoolWrapper::new(pool_state.pool, None, replicas)
})
.collect()
}
Expand Down Expand Up @@ -554,7 +554,7 @@ impl NodeWrapper {
}
})
.collect();
Some(PoolWrapper::new(pool_state.pool, replicas))
Some(PoolWrapper::new(pool_state.pool, None, replicas))
}
None => None,
}
Expand Down
19 changes: 17 additions & 2 deletions control-plane/agents/src/bin/core/pool/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use stor_port::types::v0::{
transport::{CtrlPoolState, PoolState, PoolStatus, Protocol, Replica, ReplicaId},
};

use std::{cmp::Ordering, ops::Deref};
use std::{cmp::Ordering, collections::HashMap, ops::Deref};

/// Wrapper over a `Pool` state and all the replicas
/// with Ord traits to aid pool selection for volume replicas (legacy).
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub(crate) struct PoolWrapper {
state: PoolState,
replicas: Vec<Replica>,
labels: Option<HashMap<String, String>>,
/// The accrued size/capacity of all replicas which means the pool usage could grow up to this
/// size if < pool capacity. If this size is > pool capacity, then we can say the pool is
/// overcommited by the size difference.
Expand All @@ -27,7 +28,11 @@ impl Deref for PoolWrapper {

impl PoolWrapper {
/// New Pool wrapper with the pool and replicas.
pub(crate) fn new(mut pool: PoolState, replicas: Vec<Replica>) -> Self {
pub(crate) fn new(
mut pool: PoolState,
labels: Option<HashMap<String, String>>,
replicas: Vec<Replica>,
) -> Self {
let free_space = if pool.capacity >= pool.used {
pool.capacity - pool.used
} else {
Expand All @@ -49,11 +54,16 @@ impl PoolWrapper {
Self {
state: pool,
replicas,
labels,
committed,
free_space,
}
}

/// Get the labels.
pub(crate) fn labels(&self) -> Option<HashMap<String, String>> {
self.labels.clone()
}
/// Get all the replicas.
pub(crate) fn replicas(&self) -> &Vec<Replica> {
&self.replicas
Expand Down Expand Up @@ -116,6 +126,11 @@ impl PoolWrapper {
self.state.status = PoolStatus::Unknown;
}

/// Set labels in PoolWrapper.
pub(crate) fn set_labels(&mut self, labels: Option<HashMap<String, String>>) {
self.labels = labels;
}

/// Add replica to list.
#[allow(dead_code)]
pub(crate) fn add_replica(&mut self, replica: &Replica) {
Expand Down
1 change: 1 addition & 0 deletions control-plane/agents/src/bin/core/tests/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ fn test_deserialization_v1_to_v2() {
);
labels
},
affinity: HashMap::new(),
})),
}),
sequencer: Default::default(),
Expand Down
30 changes: 26 additions & 4 deletions control-plane/agents/src/bin/core/volume/clone_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
resources::{
operations::{ResourceCloning, ResourceLifecycle, ResourceLifecycleExt},
operations_helper::{GuardedOperationsHelper, SpecOperationsHelper},
OperationGuardArc, TraceStrLog,
OperationGuardArc, ResourceUid, TraceStrLog,
},
scheduling::{volume::CloneVolumeSnapshot, ResourceFilter},
},
Expand Down Expand Up @@ -119,6 +119,7 @@ impl CreateVolumeExeVal for SnapshotCloneOp<'_> {
#[async_trait::async_trait]
impl CreateVolumeExe for SnapshotCloneOp<'_> {
type Candidates = Vec<SnapshotCloneSpecParams>;
type Replicas = Vec<Replica>;

async fn setup<'a>(&'a self, context: &mut Context<'a>) -> Result<Self::Candidates, SvcError> {
let clonable_snapshots = self.cloneable_snapshot(context).await?;
Expand All @@ -137,7 +138,7 @@ impl CreateVolumeExe for SnapshotCloneOp<'_> {
&'a self,
context: &mut Context<'a>,
clone_replicas: Self::Candidates,
) -> Vec<Replica> {
) -> Result<Vec<Replica>, SvcError> {
let mut replicas = Vec::new();
let volume_replicas = context.volume.as_ref().num_replicas as usize;
// todo: need to add new replica and do full rebuild, if clonable snapshots
Expand All @@ -161,11 +162,32 @@ impl CreateVolumeExe for SnapshotCloneOp<'_> {
}
}
}
replicas
// we can't fulfil the required replication factor, so let the caller
// decide what to do next
if replicas.len() < context.volume.as_ref().num_replicas as usize {
match self.undo(context, &replicas).await {
Ok(_) => {}
Err(_error) => {
return Err(SvcError::ReplicaCreateNumber {
id: context.volume.uid_str(),
});
}
}
Err(SvcError::ReplicaCreateNumber {
id: context.volume.uid_str(),
})
} else {
Ok(replicas)
}
}

async fn undo<'a>(&'a self, _context: &mut Context<'a>, _replicas: Vec<Replica>) {
async fn undo<'a>(
&'a self,
_context: &mut Context<'a>,
_replicas: &[Replica],
) -> Result<(), SvcError> {
// nothing to undo since we only support 1-replica snapshot
Ok(())
}
}

Expand Down
Loading

0 comments on commit cd9bbe4

Please sign in to comment.