From d4678fc3758c9cbb6908ea068632601b5f507c63 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 8 Mar 2024 15:34:21 +0000 Subject: [PATCH] v1.18: adds api to obtain the parent node in the turbine retransmit tree (backport of #115) (#135) adds api to obtain the parent node in the turbine retransmit tree (#115) Following commits will use this api to check retransmitter's signature on incoming shreds. (cherry picked from commit 42e8309c347be140ef893a35adc5eb782749e762) Co-authored-by: behzad nouri --- Cargo.lock | 1 + turbine/Cargo.toml | 1 + turbine/src/cluster_nodes.rs | 199 +++++++++++++++++++++++++++++++---- 3 files changed, 179 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b858a92bbd7db..7a555efd28997b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7596,6 +7596,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", + "test-case", "thiserror", "tokio", ] diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index e205c10bf6608f..5bd58c790d1ff5 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -44,6 +44,7 @@ tokio = { workspace = true } assert_matches = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } +test-case = { workspace = true } [[bench]] name = "cluster_info" diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 8079178cf415b9..9dd122d85c3df7 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -152,8 +152,7 @@ impl ClusterNodes { } pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { - let shred_seed = shred.seed(&self.pubkey); - let mut rng = ChaChaRng::from_seed(shred_seed); + let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred); let index = self.weighted_shuffle.first(&mut rng)?; self.nodes[index].contact_info() } @@ -187,7 +186,6 @@ impl ClusterNodes { shred: &ShredId, fanout: usize, ) -> Result { - let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { @@ -200,7 +198,7 @@ impl ClusterNodes { weighted_shuffle.remove_index(*index); } let mut addrs = HashMap::::with_capacity(self.nodes.len()); - let mut rng = ChaChaRng::from_seed(shred_seed); + let mut rng = get_seeded_rng(slot_leader, shred); let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) @@ -233,6 +231,43 @@ impl ClusterNodes { addrs, }) } + + // Returns the parent node in the turbine broadcast tree. + // Returns None if the node is the root of the tree or if it is not staked. + #[allow(unused)] + fn get_retransmit_parent( + &self, + leader: &Pubkey, + shred: &ShredId, + fanout: usize, + ) -> Result, Error> { + // Exclude slot leader from list of nodes. + if leader == &self.pubkey { + return Err(Error::Loopback { + leader: *leader, + shred: *shred, + }); + } + // Unstaked nodes' position in the turbine tree is not deterministic + // and depends on gossip propagation of contact-infos. Therefore, if + // this node is not staked return None. + if self.nodes[self.index[&self.pubkey]].stake == 0 { + return Ok(None); + } + let mut weighted_shuffle = self.weighted_shuffle.clone(); + if let Some(index) = self.index.get(leader).copied() { + weighted_shuffle.remove_index(index); + } + let mut rng = get_seeded_rng(leader, shred); + // Only need shuffled nodes until this node itself. + let nodes: Vec<_> = weighted_shuffle + .shuffle(&mut rng) + .map(|index| &self.nodes[index]) + .take_while(|node| node.pubkey() != self.pubkey) + .collect(); + let parent = get_retransmit_parent(fanout, nodes.len(), &nodes); + Ok(parent.map(Node::pubkey)) + } } pub fn new_cluster_nodes( @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ChaChaRng { + let seed = shred.seed(leader); + ChaChaRng::from_seed(seed) +} + // root : [0] // 1st layer: [1, 2, ..., fanout] // 2nd layer: [[fanout + 1, ..., fanout * 2], @@ -327,6 +367,21 @@ fn get_retransmit_peers( .copied() } +// Returns the parent node in the turbine broadcast tree. +// Returns None if the node is the root of the tree. +fn get_retransmit_parent( + fanout: usize, + index: usize, // Local node's index within the nodes slice. + nodes: &[T], +) -> Option { + // Node's index within its neighborhood. + let offset = index.saturating_sub(1) % fanout; + let index = index.checked_sub(1)? / fanout; + let index = index - index.saturating_sub(1) % fanout; + let index = if index == 0 { index } else { index + offset }; + nodes.get(index).copied() +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -527,7 +582,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: & #[cfg(test)] mod tests { - use super::*; + use { + super::*, + std::{fmt::Debug, hash::Hash}, + test_case::test_case, + }; #[test] fn test_cluster_nodes_retransmit() { @@ -600,10 +659,42 @@ mod tests { } } + // Checks (1) computed retransmit children against expected children and + // (2) computed parent of each child against the expected parent. + fn check_retransmit_nodes(fanout: usize, nodes: &[T], peers: Vec>) + where + T: Copy + Eq + PartialEq + Debug + Hash, + { + // Map node identities to their index within the shuffled tree. + let index: HashMap<_, _> = nodes + .iter() + .copied() + .enumerate() + .map(|(k, node)| (node, k)) + .collect(); + let offset = peers.len(); + // Root node's parent is None. + assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None); + for (k, peers) in peers.into_iter().enumerate() { + assert_eq!( + get_retransmit_peers(fanout, k, nodes).collect::>(), + peers + ); + let parent = Some(nodes[k]); + for peer in peers { + assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent); + } + } + // Remaining nodes have no children. + for k in offset..=nodes.len() { + assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None); + } + } + #[test] - fn test_get_retransmit_peers() { + fn test_get_retransmit_nodes() { // fanout 2 - let index = vec![ + let nodes = [ 7, // root 6, 10, // 1st layer // 2nd layer @@ -631,16 +722,9 @@ mod tests { vec![16, 9], vec![8], ]; - for (k, peers) in peers.into_iter().enumerate() { - let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); - assert_eq!(retransmit_peers.collect::>(), peers); - } - for k in 10..=index.len() { - let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); - assert_eq!(retransmit_peers.next(), None); - } + check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers); // fanout 3 - let index = vec![ + let nodes = [ 19, // root 14, 15, 28, // 1st layer // 2nd layer @@ -672,13 +756,84 @@ mod tests { vec![24, 32], vec![34], ]; - for (k, peers) in peers.into_iter().enumerate() { - let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); - assert_eq!(retransmit_peers.collect::>(), peers); + check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers); + let nodes = [ + 5, // root + 34, 52, 8, // 1st layer + // 2nd layar + 44, 18, 2, // 1st neigborhood + 42, 47, 46, // 2nd + 11, 26, 28, // 3rd + // 3rd layer + 53, 23, 37, // 1st neighborhood + 40, 13, 7, // 2nd + 50, 35, 22, // 3rd + 3, 27, 31, // 4th + 10, 48, 15, // 5th + 19, 6, 30, // 6th + 36, 45, 1, // 7th + 38, 12, 17, // 8th + 4, 32, 16, // 9th + // 4th layer + 41, 49, 24, // 1st neighborhood + 14, 9, 0, // 2nd + 29, 21, 39, // 3rd + 43, 51, 33, // 4th + 25, 20, // 5th + ]; + let peers = vec![ + vec![34, 52, 8], + vec![44, 42, 11], + vec![18, 47, 26], + vec![2, 46, 28], + vec![53, 40, 50], + vec![23, 13, 35], + vec![37, 7, 22], + vec![3, 10, 19], + vec![27, 48, 6], + vec![31, 15, 30], + vec![36, 38, 4], + vec![45, 12, 32], + vec![1, 17, 16], + vec![41, 14, 29], + vec![49, 9, 21], + vec![24, 0, 39], + vec![43, 25], + vec![51, 20], + vec![33], + ]; + check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers); + } + + #[test_case(2, 1_347)] + #[test_case(3, 1_359)] + #[test_case(4, 4_296)] + #[test_case(5, 3_925)] + #[test_case(6, 8_778)] + #[test_case(7, 9_879)] + fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) { + let mut rng = rand::thread_rng(); + let mut nodes: Vec<_> = (0..size).collect(); + nodes.shuffle(&mut rng); + // Map node identities to their index within the shuffled tree. + let index: HashMap<_, _> = nodes + .iter() + .copied() + .enumerate() + .map(|(k, node)| (node, k)) + .collect(); + // Root node's parent is None. + assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None); + for k in 1..size { + let parent = get_retransmit_parent(fanout, k, &nodes).unwrap(); + let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes); + assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k])); } - for k in 13..=index.len() { - let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); - assert_eq!(retransmit_peers.next(), None); + for k in 0..size { + let parent = Some(nodes[k]); + for peer in get_retransmit_peers(fanout, k, &nodes) { + assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent); + } } } }