diff --git a/CHANGELOG.md b/CHANGELOG.md index 98b826591..4f6401e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Bob versions changelog ## [Unreleased] #### Added - Blob performes fsync if buffered bytes are larger than max_dirty_bytes_before_sync config param (#748) +- Command to generate nodes section for cluster.yaml or add new nodes to an existing cluster.yaml using range syntax patterns (#568) #### Changed - Use cargo workspace to declare dependencies to avoid their duplication (#821) diff --git a/bob-apps/Cargo.toml b/bob-apps/Cargo.toml index e76cd2b63..bfc2cf22a 100644 --- a/bob-apps/Cargo.toml +++ b/bob-apps/Cargo.toml @@ -43,6 +43,7 @@ env_logger = { workspace = true } clap = { workspace = true } tokio = { workspace = true, features = ["signal"] } pearl = { workspace = true } +itertools = "0.10.5" [target.'cfg(all(target_env = "musl", target_arch = "x86_64", target_pointer_width = "64"))'.dependencies] diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index b03fa1398..f7098d8c1 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -4,11 +4,12 @@ mod config_cluster_generator; extern crate log; use anyhow::{anyhow, Result as AnyResult}; -use bob::{ClusterConfig}; +use bob::ClusterConfig; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, - utils::{init_logger, ceil, read_config_from_file, write_to_file}, + pattern::{pattern_extend_nodes}, + utils::{ceil, init_logger, read_config_from_file, write_to_file}, }; #[tokio::main] @@ -28,6 +29,8 @@ fn try_main() -> AnyResult<()> { match get_matches().subcommand() { ("new", Some(matches)) => subcommand_new(matches), ("expand", Some(matches)) => subcommand_expand(matches), + ("new-hw", Some(matches)) => subcommand_new_hw(matches), + ("expand-hw", Some(matches)) => subcommand_expand_hw(matches), _ => Err(anyhow!("incorrect arguments: ERR")), } } @@ -67,6 +70,39 @@ fn subcommand_expand(matches: &ArgMatches) -> AnyResult<()> { Ok(()) } +fn subcommand_new_hw(matches: &ArgMatches) -> AnyResult<()> { + debug!("start new config by pattern generation"); + debug!("arguments: {:?}", matches); + let output = generate_hw_config(matches)?; + let output = serde_yaml::to_string(&output).expect("config serialization error"); + debug!("config cluster generation: OK"); + if let Some(name) = matches.value_of("output") { + write_to_file(output, name.to_owned()); + debug!("output to file: OK"); + } else { + println!("{}", output); + debug!("no file provided, stdout print: OK"); + } + Ok(()) +} + +fn subcommand_expand_hw(matches: &ArgMatches) -> AnyResult<()> { + debug!("start config extending with new nodes by range pattern"); + debug!("arguments: {:?}", matches); + let config = read_config_from_file(&get_input_config_name(matches))?; + let output = expand_hw_config(matches, config)?; + let output = serde_yaml::to_string(&output).expect("config serialization error"); + debug!("config cluster extending: OK"); + if let Some(name) = matches.value_of("output") { + write_to_file(output, name.to_owned()); + debug!("output to file: OK"); + } else { + println!("{}", output); + debug!("no file provided, stdout print: OK"); + } + Ok(()) +} + fn generate_config(matches: &ArgMatches, input: ClusterConfig) -> AnyResult { let replicas_count = get_replicas_count(matches)?; let (total_vdisks, vdisks_per_disk) = get_vdisks_total_and_per_disk(matches)?; @@ -93,6 +129,23 @@ fn expand_config( Ok(res) } +fn generate_hw_config(matches: &ArgMatches) -> AnyResult { + debug!("arguments: {:?}", matches); + let pattern = get_pattern(matches)?; + let node_pattern = get_nodename(matches); + let res = pattern_gen(pattern, node_pattern)?; + debug!("generate hw config: OK"); + Ok(res) +} + +fn expand_hw_config(matches: &ArgMatches, config: ClusterConfig) -> AnyResult { + let pattern = get_pattern(matches)?; + let node_pattern = get_nodename(matches); + let res = pattern_expand(config, pattern, node_pattern)?; + debug!("expand hw config: OK"); + Ok(res) +} + fn simple_expand( config: ClusterConfig, mut hardware_config: ClusterConfig, @@ -153,6 +206,24 @@ fn simple_gen( Ok(config) } +fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { + let nodes = pattern_extend_nodes(vec![], pattern, node_pattern)?; + let config = ClusterConfig::new(nodes); + debug!("pattern gen: OK [\n{:#?}\n]", config); + Ok(config) +} + +fn pattern_expand( + config: ClusterConfig, + pattern: String, + node_pattern: String, +) -> AnyResult { + let nodes = pattern_extend_nodes(config.nodes().to_owned(), pattern, node_pattern)?; + let config = ClusterConfig::new(nodes); + debug!("pattern extending: OK [\n{:#?}\n]", config); + Ok(config) +} + fn get_input_config_name(matches: &ArgMatches) -> String { let name = matches .value_of("input") @@ -197,6 +268,24 @@ fn get_vdisks_total_and_per_disk(matches: &ArgMatches) -> AnyResult<(Option AnyResult { + if let Some(name) = matches.value_of("pattern") { + debug!("get_pattern: OK [{}]", name); + Ok(name.to_owned()) + } else { + debug!("get_pattern: No value"); + Err(anyhow!("Failed: no pattern present")) + } +} + +fn get_nodename(matches: &ArgMatches) -> String { + let name = matches + .value_of("nodename") + .expect("is some, because of default arg value"); + debug!("get_nodename: OK [{}]", name); + name.to_owned() +} + fn get_matches() -> ArgMatches<'static> { let input = Arg::with_name("input") .short("i") @@ -231,6 +320,15 @@ fn get_matches() -> ArgMatches<'static> { .long("use-racks") .help("Use racks field in config") .takes_value(false); + let pattern_config = Arg::with_name("pattern") + .short("p") + .help("Pattern for pattern generation") + .takes_value(true); + let nodename_config = Arg::with_name("nodename") + .short("n") + .default_value("Node_{ip}_{port}_{id}") + .help("Node name pattern for pattern generation") + .takes_value(true); debug!("input arg: OK"); let subcommand_expand = SubCommand::with_name("expand") .arg(input.clone()) @@ -239,15 +337,28 @@ fn get_matches() -> ArgMatches<'static> { .arg(hardware_config); let subcommand_new = SubCommand::with_name("new") - .arg(input) - .arg(output) + .arg(input.clone()) + .arg(output.clone()) .arg(vdisks_per_disk) .arg(vdisks_count) .arg(use_racks) .arg(replicas); + let subcommand_new_hw = SubCommand::with_name("new-hw") + .arg(output.clone()) + .arg(pattern_config.clone()) + .arg(nodename_config.clone()); + + let subcommand_expand_hw = SubCommand::with_name("expand-hw") + .arg(input.clone()) + .arg(output.clone()) + .arg(pattern_config) + .arg(nodename_config); + App::new("Config Cluster Generator") .subcommand(subcommand_expand) .subcommand(subcommand_new) + .subcommand(subcommand_new_hw) + .subcommand(subcommand_expand_hw) .get_matches() } diff --git a/bob-apps/bin/config_cluster_generator/mod.rs b/bob-apps/bin/config_cluster_generator/mod.rs index 0642c25b8..a5b08aee6 100644 --- a/bob-apps/bin/config_cluster_generator/mod.rs +++ b/bob-apps/bin/config_cluster_generator/mod.rs @@ -1,2 +1,3 @@ pub mod center; +pub mod pattern; pub mod utils; diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs new file mode 100644 index 000000000..f256dbb0a --- /dev/null +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -0,0 +1,268 @@ +use anyhow::{anyhow, Result as AnyResult}; +use bob_common::configs::cluster::Node; +use bob_common::core_types::{DiskName, DiskPath}; +use itertools::Itertools; +use regex::{Captures, Regex}; + +type Sample = ((String, u16), String); + +fn parse_address_sample(sample: &str) -> AnyResult { + debug!("Sample to parse: {}", sample); + let re = Regex::new(r"^(?[\w.]+):(?\d+)(?/[\w/]+)$").unwrap(); + if let Some(captures) = re.captures(sample) { + let ip = captures["addr"].to_owned(); + let port = captures["port"] + .parse::() + .map_err(|_| anyhow!("Failed to parse port {}", &captures["port"]))?; + let path = captures["path"].to_owned(); + Ok(((ip, port), path)) + } else { + Err(anyhow!("Failed to parse the sample {}", sample)) + } +} + +fn substitute_node_pattern(node_pattern: &str, ip: &str, port: u16, id: usize) -> String { + let re = Regex::new(r"\{(\w+)}").unwrap(); + re.replace_all(node_pattern, |caps: &Captures| match &caps[1] { + "ip" => ip.to_string(), + "port" => port.to_string(), + "id" => id.to_string(), + _ => caps[0].to_string(), + }) + .to_string() +} + +fn generate_range_samples(pattern: &str) -> impl Iterator { + let re = Regex::new(r"\[(\d+)-(\d+)]").unwrap(); + + let ranges = re.captures_iter(pattern).map(|captures| { + let start: usize = captures[1].parse().unwrap(); + let end: usize = captures[2].parse().unwrap(); + start..=end + }); + + re.split(pattern) + .zip_longest(ranges) + .map(|x| { + if let itertools::EitherOrBoth::Both(part, range) = x { + range.map(|i| part.to_string() + &i.to_string()).collect() + } else { + vec![x.left().expect("is some because split > range").to_string()] + } + }) + .multi_cartesian_product() + .map(|x| x.concat()) + .into_iter() +} + +fn pattern_extend_disks(node: &mut Node, disk_paths: impl Iterator) { + let old_disks: std::collections::HashSet<_> = node.disks().iter().map(|d| d.path()).collect(); + let new_disks: Vec = disk_paths + .filter(|disk_path| !old_disks.contains(disk_path.as_str())) + .enumerate() + .map(|(idx, disk_path)| { + DiskPath::new( + DiskName::new(&format!("disk{}", idx + old_disks.len() + 1)), + disk_path.as_str(), + ) + }) + .collect(); + node.disks_extend(new_disks); +} + +fn extend_nodes_by_samples( + nodes: &mut Vec, + parsed_samples: &Vec, + node_pattern: String, +) { + let existing_addresses: std::collections::HashSet<_> = + nodes.iter().map(|node| node.address()).collect(); + let mut new_nodes = Vec::new(); + for (ip_port, _) in &parsed_samples.iter().group_by(|(ip_port, _)| ip_port) { + let address = format!("{}:{}", ip_port.0, ip_port.1); + if !existing_addresses.contains(address.as_str()) { + let new_node = Node::new( + substitute_node_pattern( + &node_pattern, + &ip_port.0, + ip_port.1, + nodes.len() + new_nodes.len() + 1, + ), + address.to_owned(), + vec![], + ); + new_nodes.push(new_node); + } + } + nodes.extend(new_nodes); +} + +pub fn pattern_extend_nodes( + mut nodes: Vec, + pattern: String, + node_pattern: String, +) -> AnyResult> { + let parsed_samples = generate_range_samples(&pattern) + .map(|key| parse_address_sample(&key)) + .collect::>>()?; + + extend_nodes_by_samples(&mut nodes, &parsed_samples, node_pattern); + for (ip_port, paths) in &parsed_samples.iter().group_by(|(ip_port, _)| ip_port) { + if let Some(node) = nodes + .iter_mut() + .find(|node| node.address() == format!("{}:{}", ip_port.0, ip_port.1)) + { + let disks_to_extend = paths.map(|(_, path)| path.to_owned()); + pattern_extend_disks(node, disks_to_extend); + } + } + + Ok(nodes) +} + +#[cfg(test)] +mod tests { + use super::Node; + use super::*; + + #[test] + fn test_generate_range_samples() { + let pattern = "abc[1-3]def"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["abc1def", "abc2def", "abc3def"]); + + let pattern = "[0-1]a[1-2]b[2-3]"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!( + samples, + vec!["0a1b2", "0a1b3", "0a2b2", "0a2b3", "1a1b2", "1a1b3", "1a2b2", "1a2b3"] + ); + + let pattern = "a[5-6]b[2-3]c"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["a5b2c", "a5b3c", "a6b2c", "a6b3c"]); + + let pattern = "[5-5]a[0-0]"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["5a0"]); + } + + #[test] + fn test_parse_address_pattern() { + let pattern = String::from("127.0.0.1:8080/disk/path"); + let result = parse_address_sample(&pattern); + assert!(result.is_ok()); + + let ((ip, port), path) = result.unwrap(); + assert_eq!(ip, "127.0.0.1"); + assert_eq!(port, 8080); + assert_eq!(path, "/disk/path"); + + let pattern = String::from("127.0.0.1:65536/disk/path"); + let result = parse_address_sample(&pattern); + assert!(result.is_err()); + + let pattern = String::from("a,a:8080/disk/path"); + let result = parse_address_sample(&pattern); + assert!(result.is_err()); + } + #[test] + fn test_pattern_extend_nodes() { + let nodes = vec![]; + let pattern = "test[1-3]:10000/a[1-2]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + ] + ); + // extending + let old_nodes = result; + let pattern = "test[2-4]:10000/a[2-5]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test4_10000_4".to_string(), + "test4:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a2"), + DiskPath::new(DiskName::new("disk2"), "/a3"), + DiskPath::new(DiskName::new("disk3"), "/a4"), + DiskPath::new(DiskName::new("disk4"), "/a5"), + ], + ), + ] + ); + } + + #[test] + fn test_pattern_extend_nodes_invalid() { + let old_nodes = vec![]; + let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u16 + let node_pattern = "{ip}_{port}_{id}".to_string(); + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern); + assert!(result.is_err()); + } +} diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index 206cb375b..bf6c56781 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -75,7 +75,7 @@ impl Validatable for Rack { } /// Node config struct, with name, address and [`DiskPath`]s. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Node { name: String, address: String, @@ -83,6 +83,14 @@ pub struct Node { } impl Node { + #[must_use] + pub fn new(name: String, address: String, disks: Vec) -> Node { + Node { + name, + address, + disks, + } + } /// Returns node name, empty if name wasn't set in config. #[inline] #[must_use] @@ -97,6 +105,11 @@ impl Node { &self.disks } + /// Extends the disks collection with contents of the iterator. + pub fn disks_extend(&mut self, iter: impl IntoIterator) { + self.disks.extend(iter) + } + /// Returns node address, empty if address wasn't set in config. #[inline] #[must_use] @@ -247,6 +260,16 @@ pub struct Cluster { } impl Cluster { + pub fn new( + nodes: Vec, + ) -> Cluster { + Cluster { + nodes, + vdisks: Vec::default(), + racks: Vec::default(), + distribution_func: DistributionFunc::default(), + } + } /// Returns slice with [`Node`]s. #[must_use] pub fn nodes(&self) -> &[Node] {