Skip to content

Commit

Permalink
node expanding numeration fix (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tagir Asadullin committed Jan 31, 2024
1 parent 15df29f commit fc19bd2
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 48 deletions.
11 changes: 6 additions & 5 deletions bob-apps/bin/ccg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bob_common::configs::{cluster::DistributionFunc};
use clap::{App, Arg, ArgMatches, SubCommand};
use config_cluster_generator::{
center::{check_expand_configs, get_new_disks, get_new_racks, Center},
pattern::{pattern_to_nodes},
pattern::{pattern_extend_nodes},
utils::{ceil, init_logger, read_config_from_file, write_to_file},
};

Expand Down Expand Up @@ -208,19 +208,20 @@ fn simple_gen(
}

fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult<ClusterConfig> {
let nodes = pattern_to_nodes(pattern, node_pattern)?;
let nodes = pattern_extend_nodes(vec![], pattern, node_pattern)?;
let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default());
debug!("pattern gen: OK [\n{:#?}\n]", config);
Ok(config)
}

fn pattern_expand(
mut config: ClusterConfig,
config: ClusterConfig,
pattern: String,
node_pattern: String,
) -> AnyResult<ClusterConfig> {
let nodes = pattern_to_nodes(pattern, node_pattern)?;
config.disjoint_union_nodes(nodes);
let nodes = pattern_extend_nodes(config.nodes().to_owned(), pattern, node_pattern)?;
let config = ClusterConfig::new(nodes, config.vdisks().to_owned(), config.racks().to_owned(), config.distribution_func());
debug!("pattern extending: OK [\n{:#?}\n]", config);
Ok(config)
}

Expand Down
173 changes: 153 additions & 20 deletions bob-apps/bin/config_cluster_generator/pattern.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result as AnyResult};
use itertools::Itertools;
use bob_common::configs::cluster::Node;
use bob_common::core_types::{DiskName, DiskPath};
use itertools::Itertools;

fn parse_address_pattern(pattern: &String) -> AnyResult<((String, u16), String)> {
debug!("Pattern to parse: {}", pattern);
Expand Down Expand Up @@ -51,39 +51,73 @@ fn generate_range_samples(pattern: &str) -> impl Iterator<Item = String> {
.into_iter()
}

pub fn pattern_to_nodes(pattern: String, node_pattern: String) -> AnyResult<Vec<Node>> {
pub fn pattern_extend_nodes(
mut old_nodes: Vec<Node>,
pattern: String,
node_pattern: String,
) -> AnyResult<Vec<Node>> {
let mut err = Ok(());
let nodes = generate_range_samples(&pattern)
let mut node_counter = old_nodes.len();
let new_nodes: Vec<Node> = generate_range_samples(&pattern)
.map_while(|key| {
let result = parse_address_pattern(&key);
match result {
Ok(((ip, port), path)) => Some(((ip, port), path)),
Err(e) => { err = Err(e); None },
Err(e) => {
err = Err(e);
None
}
}
})
.group_by(|key| key.0.clone())
.into_iter()
.enumerate()
.map(|(node_count, (ip_port, addresses))| {
let disks: Vec<DiskPath> = addresses
.enumerate()
.map(|(disk_count, disk)| {
DiskPath::new(
DiskName::new(format!("disk{}", disk_count + 1).as_str()),
disk.1.as_str(),
)
.filter_map(|(ip_port, addresses)| {
let address = format!("{}:{}", ip_port.0, ip_port.1);
let existed_node = old_nodes
.iter()
.find_position(|node| node.address() == address);

let mut old_disks = existed_node
.map(|(_, node)| node.disks().to_vec())
.unwrap_or_default();

let mut disk_counter = old_disks.len();
let new_disks: Vec<DiskPath> = addresses
.filter_map(|(_, path)| {
let disk_path = path.as_str();
if !old_nodes.is_empty() && old_disks.iter().any(|d| d.path() == disk_path) {
None
} else {
disk_counter += 1;
Some(DiskPath::new(
DiskName::new(&format!("disk{}", disk_counter)),
disk_path,
))
}
})
.collect();

Node::new(
substitute_node(node_pattern.as_str(), ip_port.0.as_str(), ip_port.1, node_count + 1),
format!("{}:{}", ip_port.0, ip_port.1),
disks,
)
old_disks.extend_from_slice(&new_disks);
if let Some((pos, node)) = existed_node {
old_nodes[pos] = Node::new(node.name().to_string(), address, old_disks);
None
} else {
node_counter += 1;
Some(Node::new(
substitute_node(
node_pattern.as_str(),
ip_port.0.as_str(),
ip_port.1,
node_counter,
),
address,
old_disks,
))
}
})
.collect();
err?;
Ok(nodes)
old_nodes.extend_from_slice(&new_nodes);
Ok(old_nodes)
}

#[cfg(test)]
Expand Down Expand Up @@ -131,4 +165,103 @@ mod tests {
let result = parse_address_pattern(&pattern);
assert!(result.is_err());
}
#[test]
fn test_pattern_extend_nodes() {
let old_nodes = vec![];
let pattern = "test[1-3]:10000/a[1-2]".to_string();
let node_pattern = "{ip}_{port}_{id}".to_string();

let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap();

assert_eq!(
result,
vec![
Node::new(
"test1_10000_1".to_string(),
"test1:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
],
),
Node::new(
"test2_10000_2".to_string(),
"test2:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
],
),
Node::new(
"test3_10000_3".to_string(),
"test3:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
],
),
]
);
// extending
let old_nodes = result;
let pattern = "test[2-4]:10000/a[2-5]".to_string();
let node_pattern = "{ip}_{port}_{id}".to_string();

let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap();

assert_eq!(
result,
vec![
Node::new(
"test1_10000_1".to_string(),
"test1:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
],
),
Node::new(
"test2_10000_2".to_string(),
"test2:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
DiskPath::new(DiskName::new("disk3"), "/a3"),
DiskPath::new(DiskName::new("disk4"), "/a4"),
DiskPath::new(DiskName::new("disk5"), "/a5"),
],
),
Node::new(
"test3_10000_3".to_string(),
"test3:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a1"),
DiskPath::new(DiskName::new("disk2"), "/a2"),
DiskPath::new(DiskName::new("disk3"), "/a3"),
DiskPath::new(DiskName::new("disk4"), "/a4"),
DiskPath::new(DiskName::new("disk5"), "/a5"),
],
),
Node::new(
"test4_10000_4".to_string(),
"test4:10000".to_string(),
vec![
DiskPath::new(DiskName::new("disk1"), "/a2"),
DiskPath::new(DiskName::new("disk2"), "/a3"),
DiskPath::new(DiskName::new("disk3"), "/a4"),
DiskPath::new(DiskName::new("disk4"), "/a5"),
],
),
]
);
}

#[test]
fn test_pattern_extend_nodes_invalid() {
let old_nodes = vec![];
let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u8
let node_pattern = "{ip}_{port}_{id}".to_string();
let result = pattern_extend_nodes(old_nodes, pattern, node_pattern);
assert!(result.is_err());
}
}
26 changes: 3 additions & 23 deletions bob-common/src/configs/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Validatable for DiskPath {
}

/// Rack config struct, with name and [`Node`] names.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct Rack {
name: String,
nodes: Vec<String>,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Validatable for Rack {
}

/// Node config struct, with name, address and [`DiskPath`]s.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct Node {
name: String,
address: String,
Expand Down Expand Up @@ -176,7 +176,7 @@ impl Validatable for Replica {
}

/// Config for virtual disks, stores replicas locations.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct VDisk {
id: u32,
#[serde(default)]
Expand Down Expand Up @@ -458,26 +458,6 @@ impl Cluster {
Ok(config)
}
}

pub fn disjoint_union_nodes(&mut self, other_nodes: Vec<Node>) {
for node2 in other_nodes {
if let Some(node1) = self
.nodes
.iter_mut()
.find(|node1| node1.address() == node2.address()) {
node2.disks.into_iter().for_each(|disk2| {
if !node1.disks.iter().any(|disk1| disk1.path() == disk2.path()) {
node1.disks.push(DiskPath::new(
DiskName::new(&format!("disk{}", node1.disks.len() + 1)),
disk2.path(),
));
}
});
} else {
self.nodes.push(node2);
}
}
}
}

impl Validatable for Cluster {
Expand Down

0 comments on commit fc19bd2

Please sign in to comment.