From 5fff269143e19aab04f1b8f6aac85293ec0fb915 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 27 Nov 2018 14:48:04 -0800 Subject: [PATCH 01/17] Add signatures to crds values --- src/cluster_info.rs | 10 ++--- src/contact_info.rs | 25 ++++++++++- src/crds.rs | 56 +++++------------------- src/crds_gossip_pull.rs | 6 +-- src/crds_value.rs | 94 ++++++++++++++++++++++++++++++++++++++--- 5 files changed, 125 insertions(+), 66 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index cac02c9222bbbd..67d339c3cdb49c 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -66,7 +66,7 @@ pub struct ClusterInfo { #[derive(Serialize, Deserialize, Debug)] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] enum Protocol { - /// Gosisp protocol messages + /// Gossip protocol messages PullRequest(Bloom, CrdsValue), PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), @@ -165,11 +165,7 @@ impl ClusterInfo { let prev = self.leader_id(); let self_id = self.gossip.id; let now = timestamp(); - let leader = LeaderId { - id: self_id, - leader_id: key, - wallclock: now, - }; + let leader = LeaderId::new(self_id, key, now); let entry = CrdsValue::LeaderId(leader); warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev); self.gossip.process_push_message(&[entry], now); @@ -821,7 +817,7 @@ impl ClusterInfo { ledger_window: &mut Option<&mut LedgerWindow>, ) -> Vec { match request { - // TODO sigverify these + // TODO(sagar) sigverify these Protocol::PullRequest(filter, caller) => { Self::handle_pull_request(me, filter, caller, from_addr) } diff --git a/src/contact_info.rs b/src/contact_info.rs index 56512031c924a0..801f543b0697b0 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -1,5 +1,6 @@ +use bincode::serialize; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil}; +use signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -8,6 +9,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ContactInfo { pub id: Pubkey, + /// signature of this ContactInfo + pub signature: Signature, /// gossip address pub ncp: SocketAddr, /// address to connect to for replication @@ -52,6 +55,7 @@ impl Default for ContactInfo { rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), wallclock: 0, + signature: Signature::default(), } } } @@ -69,6 +73,7 @@ impl ContactInfo { ) -> Self { ContactInfo { id, + signature: Signature::default(), ncp, tvu, tpu, @@ -159,6 +164,24 @@ impl ContactInfo { pub fn is_valid_address(addr: &SocketAddr) -> bool { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } + pub fn get_sign_data(&self) -> Vec { + let mut data = serialize(&self.id).expect("serialize id"); + let ncp = serialize(&self.ncp).expect("serialize ncp"); + data.extend_from_slice(&ncp); + let tvu = serialize(&self.tvu).expect("serialize tvu"); + data.extend_from_slice(&tvu); + let tpu = serialize(&self.tpu).expect("serialize tpu"); + data.extend_from_slice(&tpu); + let storage_addr = serialize(&self.storage_addr).expect("serialize storage_addr"); + data.extend_from_slice(&storage_addr); + let rpc = serialize(&self.rpc).expect("serialize rpc"); + data.extend_from_slice(&rpc); + let rpc_pubsub = serialize(&self.rpc_pubsub).expect("serialize rpc_pubsub"); + data.extend_from_slice(&rpc_pubsub); + let wallclock = serialize(&self.wallclock).expect("serialize wallclock"); + data.extend_from_slice(&wallclock); + data + } } #[cfg(test)] diff --git a/src/crds.rs b/src/crds.rs index 82e40320e012fd..3af190126c2aeb 100644 --- a/src/crds.rs +++ b/src/crds.rs @@ -41,7 +41,7 @@ pub enum CrdsError { InsertFailed, } -/// This structure stores some local metadata assosciated with the CrdsValue +/// This structure stores some local metadata associated with the CrdsValue /// The implementation of PartialOrd ensures that the "highest" version is always picked to be /// stored in the Crds #[derive(PartialEq, Debug)] @@ -188,11 +188,7 @@ mod test { let mut crds = Crds::default(); let original = CrdsValue::LeaderId(LeaderId::default()); assert_matches!(crds.insert(original.clone(), 0), Ok(_)); - let val = CrdsValue::LeaderId(LeaderId { - id: Pubkey::default(), - leader_id: Pubkey::default(), - wallclock: 1, - }); + let val = CrdsValue::LeaderId(LeaderId::new(Pubkey::default(), Pubkey::default(), 1)); assert_eq!( crds.insert(val.clone(), 1).unwrap().unwrap().value, original @@ -255,19 +251,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); assert!(!(v1 != v2)); assert!(v1 == v2); @@ -277,19 +265,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: key.pubkey(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), key.pubkey(), 0)), ); assert!(v1 != v2); assert!(!(v1 == v2)); @@ -304,19 +284,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 1, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 1)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(),Pubkey::default(),0)), ); assert!(v1 > v2); assert!(!(v1 < v2)); @@ -327,19 +299,11 @@ mod test { fn test_label_order() { let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: Keypair::new().pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: Keypair::new().pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(),0)), ); assert!(v1 != v2); assert!(!(v1 == v2)); diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index 2b77fbfa1bb1f9..fc48bd40317351 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -292,11 +292,7 @@ mod test { // node contains a key from the dest node, but at an older local timestamp let dest_id = new.label().pubkey(); - let same_key = CrdsValue::LeaderId(LeaderId { - id: dest_id, - leader_id: dest_id, - wallclock: 1, - }); + let same_key = CrdsValue::LeaderId(LeaderId::new(dest_id, dest_id, 1)); node_crds.insert(same_key.clone(), 0).unwrap(); assert_eq!( node_crds diff --git a/src/crds_value.rs b/src/crds_value.rs index 9ad502cb0ec024..b15f1172d08de3 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -1,4 +1,6 @@ +use bincode::serialize; use contact_info::ContactInfo; +use signature::{Keypair, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; use std::fmt; @@ -18,6 +20,7 @@ pub enum CrdsValue { #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct LeaderId { pub id: Pubkey, + pub signature: Signature, pub leader_id: Pubkey, pub wallclock: u64, } @@ -25,12 +28,13 @@ pub struct LeaderId { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Vote { pub transaction: Transaction, + pub signature: Signature, pub height: u64, pub wallclock: u64, } /// Type of the replicated value -/// These are labels for values in a record that is assosciated with `Pubkey` +/// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] pub enum CrdsValueLabel { ContactInfo(Pubkey), @@ -58,8 +62,30 @@ impl CrdsValueLabel { } } +impl LeaderId { + pub fn new(id: Pubkey, leader_id: Pubkey, wallclock: u64) -> Self { + LeaderId { + id, + signature: Signature::default(), + leader_id, + wallclock, + } + } +} + +impl Vote { + pub fn new(transaction: Transaction, height: u64, wallclock: u64) -> Self { + Vote { + transaction, + signature: Signature::default(), + height, + wallclock, + } + } +} + impl CrdsValue { - /// Totally unsecure unverfiable wallclock of the node that generatd this message + /// Totally unsecure unverfiable wallclock of the node that generated this message /// Latest wallclock is always picked. /// This is used to time out push messages. pub fn wallclock(&self) -> u64 { @@ -102,11 +128,58 @@ impl CrdsValue { CrdsValueLabel::LeaderId(key), ] } + + pub fn sign(&mut self, keypair: &Keypair) { + let sign_data = self.get_sign_data(); + let signature = Signature::new(&keypair.sign(&sign_data).as_ref()); + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.signature = signature, + CrdsValue::Vote(vote) => vote.signature = signature, + CrdsValue::LeaderId(leader_id) => leader_id.signature = signature, + } + } + + pub fn verify_signature(&self) -> bool { + let sig = match self { + CrdsValue::ContactInfo(contact_info) => contact_info.signature, + CrdsValue::Vote(vote) => vote.signature, + CrdsValue::LeaderId(leader_id) => leader_id.signature, + }; + sig.verify( + &self.label().pubkey().as_ref(), + &self.get_sign_data(), + ) + } + + /// get all the data relevant to signing contained in this value (excludes signature fields) + fn get_sign_data(&self) -> Vec { + let mut data = serialize(&self.wallclock()).expect("serialize wallclock"); + + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.get_sign_data(), + CrdsValue::Vote(vote) => { + let transaction = serialize(&vote.transaction).expect("serialize transaction"); + data.extend_from_slice(&transaction); + let height = serialize(&vote.transaction).expect("serialize height"); + data.extend_from_slice(&height); + data + } + CrdsValue::LeaderId(leader_id) => { + let id = serialize(&leader_id.id).expect("serialize id"); + data.extend_from_slice(&id); + let leader_id = serialize(&leader_id.leader_id).expect("serialize leader_id"); + data.extend_from_slice(&leader_id); + data + } + } + } } #[cfg(test)] mod test { use super::*; use contact_info::ContactInfo; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::timing::timestamp; use system_transaction::test_tx; #[test] @@ -134,14 +207,21 @@ mod test { let key = v.clone().contact_info().unwrap().id; assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); - let v = CrdsValue::Vote(Vote { - transaction: test_tx(), - height: 1, - wallclock: 0, - }); + let v = CrdsValue::Vote(Vote::new(test_tx(), 1, 0)); assert_eq!(v.wallclock(), 0); let key = v.clone().vote().unwrap().transaction.account_keys[0]; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); } + #[test] + fn test_signature() { + let keypair = Keypair::new(); + let fake_keypair = Keypair::new(); + let leader = LeaderId::new(keypair.pubkey(), Pubkey::default(), timestamp()); + let mut v = CrdsValue::LeaderId(leader); + v.sign(&keypair); + assert!(v.verify_signature()); + v.sign(&fake_keypair); + assert!(!v.verify_signature()); + } } From abc37436267a48e3a7dfc3cd026106e42e4ff6c7 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 27 Nov 2018 17:20:31 -0800 Subject: [PATCH 02/17] Add Signature verification to gossip messages --- src/cluster_info.rs | 39 ++++++++++++++++++++++++++++++++------- src/fullnode.rs | 5 ++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 67d339c3cdb49c..0cc5dcf015981f 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -60,6 +60,8 @@ pub enum ClusterInfoError { pub struct ClusterInfo { /// The network pub gossip: CrdsGossip, + /// set the keypair that will be used to sign crds values generated. It is unset only in tests. + keypair: Option>, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering @@ -79,8 +81,13 @@ enum Protocol { impl ClusterInfo { pub fn new(node_info: NodeInfo) -> Self { + //Without a keypair, gossip will not function. Only useful for tests. + ClusterInfo::new_with_keypair(node_info, None) + } + pub fn new_with_keypair(node_info: NodeInfo, keypair: Option>) -> Self { let mut me = ClusterInfo { gossip: CrdsGossip::default(), + keypair, }; let id = node_info.id; me.gossip.set_self(id); @@ -92,12 +99,18 @@ impl ClusterInfo { let mut my_data = self.my_data(); let now = timestamp(); my_data.wallclock = now; - let entry = CrdsValue::ContactInfo(my_data); + let mut entry = CrdsValue::ContactInfo(my_data); + if let Some(keypair) = self.keypair.clone() { + entry.sign(keypair.as_ref()); + } self.gossip.refresh_push_active_set(); self.gossip.process_push_message(&[entry], now); } pub fn insert_info(&mut self, node_info: NodeInfo) { - let value = CrdsValue::ContactInfo(node_info); + let mut value = CrdsValue::ContactInfo(node_info); + if let Some(keypair) = self.keypair.clone() { + value.sign(keypair.as_ref()); + } let _ = self.gossip.crds.insert(value, timestamp()); } pub fn id(&self) -> Pubkey { @@ -166,8 +179,11 @@ impl ClusterInfo { let self_id = self.gossip.id; let now = timestamp(); let leader = LeaderId::new(self_id, key, now); - let entry = CrdsValue::LeaderId(leader); + let mut entry = CrdsValue::LeaderId(leader); warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev); + if let Some(keypair) = self.keypair.clone() { + entry.sign(keypair.as_ref()); + } self.gossip.process_push_message(&[entry], now); } @@ -817,16 +833,25 @@ impl ClusterInfo { ledger_window: &mut Option<&mut LedgerWindow>, ) -> Vec { match request { - // TODO(sagar) sigverify these + // TODO verify messages faster Protocol::PullRequest(filter, caller) => { - Self::handle_pull_request(me, filter, caller, from_addr) + if caller.verify_signature() { + Self::handle_pull_request(me, filter, caller, from_addr) + } else { + vec![] + } } - Protocol::PullResponse(from, data) => { + Protocol::PullResponse(from, mut data) => { + data.retain(|v| v.verify_signature()); Self::handle_pull_response(me, from, data); vec![] } - Protocol::PushMessage(from, data) => Self::handle_push_message(me, from, &data), + Protocol::PushMessage(from, mut data) => { + data.retain(|v| v.verify_signature()); + Self::handle_push_message(me, from, &data) + } Protocol::PruneMessage(from, data) => { + //TODO does prune need sig verify? inc_new_counter_info!("cluster_info-prune_message", 1); inc_new_counter_info!("cluster_info-prune_message-size", data.len()); me.write().unwrap().gossip.process_prune_msg(from, &data); diff --git a/src/fullnode.rs b/src/fullnode.rs index 9f61681d3ca95d..9a56dfb7f01031 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -232,7 +232,10 @@ impl Fullnode { let window = new_window(32 * 1024); let shared_window = Arc::new(RwLock::new(window)); node.info.wallclock = timestamp(); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info))); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair( + node.info, + Some(keypair.clone()), + ))); let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info); From 4e47f4292c2471fe10d7cfef71be6de96fa7f386 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 27 Nov 2018 17:31:08 -0800 Subject: [PATCH 03/17] Fix fmt --- src/crds.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crds.rs b/src/crds.rs index 3af190126c2aeb..4257f7b5df6982 100644 --- a/src/crds.rs +++ b/src/crds.rs @@ -288,7 +288,7 @@ mod test { ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId::new(key.pubkey(),Pubkey::default(),0)), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); assert!(v1 > v2); assert!(!(v1 < v2)); @@ -303,7 +303,7 @@ mod test { ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(),0)), + CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)), ); assert!(v1 != v2); assert!(!(v1 == v2)); From da9f334c4911bfa55c2bfe9d1165f429f1c76d50 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 28 Nov 2018 14:47:39 -0800 Subject: [PATCH 04/17] Add test for gossip msg verification - Assign default to keypair --- src/cluster_info.rs | 59 ++++++++++++++++++++++++++++++++++++--------- src/fullnode.rs | 2 +- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 0cc5dcf015981f..d5c7e9dc317e96 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -61,7 +61,7 @@ pub struct ClusterInfo { /// The network pub gossip: CrdsGossip, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. - keypair: Option>, + keypair: Arc, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering @@ -82,9 +82,9 @@ enum Protocol { impl ClusterInfo { pub fn new(node_info: NodeInfo) -> Self { //Without a keypair, gossip will not function. Only useful for tests. - ClusterInfo::new_with_keypair(node_info, None) + ClusterInfo::new_with_keypair(node_info, Arc::new(Keypair::new())) } - pub fn new_with_keypair(node_info: NodeInfo, keypair: Option>) -> Self { + pub fn new_with_keypair(node_info: NodeInfo, keypair: Arc) -> Self { let mut me = ClusterInfo { gossip: CrdsGossip::default(), keypair, @@ -100,17 +100,13 @@ impl ClusterInfo { let now = timestamp(); my_data.wallclock = now; let mut entry = CrdsValue::ContactInfo(my_data); - if let Some(keypair) = self.keypair.clone() { - entry.sign(keypair.as_ref()); - } + entry.sign(&self.keypair); self.gossip.refresh_push_active_set(); self.gossip.process_push_message(&[entry], now); } pub fn insert_info(&mut self, node_info: NodeInfo) { let mut value = CrdsValue::ContactInfo(node_info); - if let Some(keypair) = self.keypair.clone() { - value.sign(keypair.as_ref()); - } + value.sign(&self.keypair); let _ = self.gossip.crds.insert(value, timestamp()); } pub fn id(&self) -> Pubkey { @@ -181,9 +177,7 @@ impl ClusterInfo { let leader = LeaderId::new(self_id, key, now); let mut entry = CrdsValue::LeaderId(leader); warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev); - if let Some(keypair) = self.keypair.clone() { - entry.sign(keypair.as_ref()); - } + entry.sign(&self.keypair); self.gossip.process_push_message(&[entry], now); } @@ -1364,4 +1358,45 @@ mod tests { assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); } + + //test that all cluster_info objects only generate signed messages + //when constructed with keypairs + #[test] + fn test_gossip_signature_verification() { + //create new cluster info, leader, and peer + let keypair = Keypair::new(); + let peer_keypair = Keypair::new(); + let leader_keypair = Keypair::new(); + let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0); + let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); + let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); + let mut cluster_info = ClusterInfo::new_with_keypair(node_info, Arc::new(keypair)); + let peer_cluster_info = &Arc::new(RwLock::new(ClusterInfo::new_with_keypair( + peer.clone(), + Arc::new(peer_keypair), + ))); + cluster_info.set_leader(leader.id); + cluster_info.insert_info(peer.clone()); + //check that all types of gossip messages are signed correctly + let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); + vals.par_iter().for_each(|v| assert!(v.verify_signature())); + let (_, _, val) = cluster_info + .gossip + .new_pull_request(timestamp()) + .ok() + .unwrap(); + assert!(val.verify_signature()); + // there should be some pushes ready + assert!(vals.len() > 0); + //using PushMessage since it has the fewest dependencies, but any protocol should work here + let resp = ClusterInfo::handle_protocol( + &peer_cluster_info, + &peer.ncp, + Protocol::PushMessage(cluster_info.id(), vals), + &SharedWindow::default(), + &mut None, + ); + // there should be no prunes but check anyway + assert_eq!(resp.len(), 0); + } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 9a56dfb7f01031..328552da2c3f3d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -234,7 +234,7 @@ impl Fullnode { node.info.wallclock = timestamp(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair( node.info, - Some(keypair.clone()), + keypair.clone(), ))); let (rpc_service, rpc_pubsub_service) = From ba53e7477a1d527f1164b654ad08bb6e3a4318be Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 28 Nov 2018 14:53:03 -0800 Subject: [PATCH 05/17] fmt fix --- src/crds_value.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/crds_value.rs b/src/crds_value.rs index b15f1172d08de3..ff36ee457d5133 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -145,10 +145,7 @@ impl CrdsValue { CrdsValue::Vote(vote) => vote.signature, CrdsValue::LeaderId(leader_id) => leader_id.signature, }; - sig.verify( - &self.label().pubkey().as_ref(), - &self.get_sign_data(), - ) + sig.verify(&self.label().pubkey().as_ref(), &self.get_sign_data()) } /// get all the data relevant to signing contained in this value (excludes signature fields) From 51ed099ea55198c8ea177b76951be13e1a94e268 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 28 Nov 2018 17:15:47 -0800 Subject: [PATCH 06/17] Add verification to prune messages --- src/cluster_info.rs | 53 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index d5c7e9dc317e96..4617b24eb7b28b 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -27,7 +27,7 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil}; +use signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{duration_as_ms, timestamp}; @@ -64,7 +64,25 @@ pub struct ClusterInfo { keypair: Arc, } -// TODO These messages should be signed, and go through the gpu pipeline for spam filtering +#[derive(Debug, Deserialize, Serialize)] +pub struct PruneData { + /// Pubkeys of nodes that should be pruned + pub prunes: Vec, + /// Signature of this Prune Message + pub signature: Signature, + /// The Pubkey of the intended node/destination for this message + pub destination: Pubkey, +} + +impl PruneData { + pub fn get_sign_data(&self) -> Vec { + let mut data = serialize(&self.prunes).expect("serialize prunes"); + data.extend_from_slice(&serialize(&self.destination).expect("serialize destination")); + data + } +} + +// TODO These messages should go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] enum Protocol { @@ -72,7 +90,7 @@ enum Protocol { PullRequest(Bloom, CrdsValue), PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), - PruneMessage(Pubkey, Vec), + PruneMessage(Pubkey, PruneData), /// Window protocol messages /// TODO: move this message to a different module @@ -751,7 +769,19 @@ impl ClusterInfo { if !prunes.is_empty() { let mut wme = me.write().unwrap(); inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); - let rsp = Protocol::PruneMessage(self_id, prunes); + let mut prune_msg = PruneData { + prunes, + signature: Signature::default(), + destination: from, + }; + prune_msg.signature = Signature::new( + me.read() + .unwrap() + .keypair + .sign(prune_msg.get_sign_data().as_ref()) + .as_ref(), + ); + let rsp = Protocol::PruneMessage(self_id, prune_msg); let ci = wme.lookup(from).cloned(); let pushes: Vec<_> = wme.new_push_requests(); inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len()); @@ -845,10 +875,17 @@ impl ClusterInfo { Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { - //TODO does prune need sig verify? - inc_new_counter_info!("cluster_info-prune_message", 1); - inc_new_counter_info!("cluster_info-prune_message-size", data.len()); - me.write().unwrap().gossip.process_prune_msg(from, &data); + if data.destination == me.read().unwrap().id() && data + .signature + .verify(from.as_ref(), data.get_sign_data().as_ref()) + { + inc_new_counter_info!("cluster_info-prune_message", 1); + inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len()); + me.write() + .unwrap() + .gossip + .process_prune_msg(from, &data.prunes); + } vec![] } Protocol::RequestWindowIndex(from, ix) => { From 9b88cd260321504b35a25ae36f81f3843e7a8b56 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 28 Nov 2018 17:45:12 -0800 Subject: [PATCH 07/17] Add source addr verification - Needs tests --- src/cluster_info.rs | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 4617b24eb7b28b..27f8e5b16577fc 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -72,12 +72,15 @@ pub struct PruneData { pub signature: Signature, /// The Pubkey of the intended node/destination for this message pub destination: Pubkey, + /// The source Addr this message should have been received from + pub source: SocketAddr, } impl PruneData { pub fn get_sign_data(&self) -> Vec { let mut data = serialize(&self.prunes).expect("serialize prunes"); data.extend_from_slice(&serialize(&self.destination).expect("serialize destination")); + data.extend_from_slice(&serialize(&self.source).expect("serialize source")); data } } @@ -769,25 +772,28 @@ impl ClusterInfo { if !prunes.is_empty() { let mut wme = me.write().unwrap(); inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); - let mut prune_msg = PruneData { - prunes, - signature: Signature::default(), - destination: from, - }; - prune_msg.signature = Signature::new( - me.read() - .unwrap() - .keypair - .sign(prune_msg.get_sign_data().as_ref()) - .as_ref(), - ); - let rsp = Protocol::PruneMessage(self_id, prune_msg); let ci = wme.lookup(from).cloned(); let pushes: Vec<_> = wme.new_push_requests(); inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len()); let mut rsp: Vec<_> = ci - .and_then(|ci| to_blob(rsp, ci.ncp).ok()) - .into_iter() + .and_then(|ci| { + let mut prune_msg = PruneData { + prunes, + signature: Signature::default(), + destination: from, + source: ci.ncp.clone(), + }; + prune_msg.signature = Signature::new( + me.read() + .unwrap() + .keypair + .sign(prune_msg.get_sign_data().as_ref()) + .as_ref(), + ); + let rsp = Protocol::PruneMessage(self_id, prune_msg); + + to_blob(rsp, ci.ncp).ok() + }).into_iter() .collect(); let mut blobs: Vec<_> = pushes .into_iter() @@ -875,7 +881,7 @@ impl ClusterInfo { Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { - if data.destination == me.read().unwrap().id() && data + if data.destination == me.read().unwrap().id() && data.source == *from_addr && data .signature .verify(from.as_ref(), data.get_sign_data().as_ref()) { @@ -1436,4 +1442,6 @@ mod tests { // there should be no prunes but check anyway assert_eq!(resp.len(), 0); } + + //TODO test prunes and make sure they can't be forwarded. } From 8b8f3f9fa21b1ebca04d884c921aa9300fa46561 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 29 Nov 2018 12:57:14 -0800 Subject: [PATCH 08/17] Add Signable trait - Added a Signable trait that all signable values can implement. - CrdsValue is a Signable that contains Signables so it doesn't need to provide implementations for all the methods and can just override some defaults --- sdk/src/signature.rs | 18 ++++++ src/cluster_info.rs | 57 ++++++++++++------- src/contact_info.rs | 59 +++++++++++++------ src/crds_value.rs | 132 ++++++++++++++++++++++++++++++------------- 4 files changed, 191 insertions(+), 75 deletions(-) diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index 0cfcca4260a825..355b877030f27b 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -30,6 +30,24 @@ impl Signature { } } +pub trait Signable { + fn sign(&mut self, keypair: &Keypair) { + let data = self.get_sign_data(); + self.set_signature(Signature::new( + &keypair.sign(&data).as_ref(), + )); + } + fn verify(&self) -> bool { + self.get_signature() + .verify(&self.pubkey().as_ref(), &self.get_sign_data()) + } + + fn pubkey(&self) -> Pubkey; + fn get_sign_data(&self) -> Vec; + fn get_signature(&self) -> Signature; + fn set_signature(&mut self, signature: Signature); +} + impl AsRef<[u8]> for Signature { fn as_ref(&self) -> &[u8] { &self.0[..] diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 27f8e5b16577fc..7b13698737f43c 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -27,7 +27,7 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil, Signature}; +use signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{duration_as_ms, timestamp}; @@ -66,6 +66,8 @@ pub struct ClusterInfo { #[derive(Debug, Deserialize, Serialize)] pub struct PruneData { + /// Pubkey of the node that sent this prune data + pub pubkey: Pubkey, /// Pubkeys of nodes that should be pruned pub prunes: Vec, /// Signature of this Prune Message @@ -76,12 +78,34 @@ pub struct PruneData { pub source: SocketAddr, } -impl PruneData { - pub fn get_sign_data(&self) -> Vec { - let mut data = serialize(&self.prunes).expect("serialize prunes"); - data.extend_from_slice(&serialize(&self.destination).expect("serialize destination")); - data.extend_from_slice(&serialize(&self.source).expect("serialize source")); - data +impl Signable for PruneData { + fn pubkey(&self) -> Pubkey { + self.pubkey + } + + fn get_sign_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + pubkey: Pubkey, + prunes: Vec, + destination: Pubkey, + source: SocketAddr, + } + let data = SignData { + pubkey: self.pubkey, + prunes: self.prunes.clone(), + destination: self.destination, + source: self.source, + }; + serialize(&data).expect("serialize PruneData") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature } } @@ -778,18 +802,13 @@ impl ClusterInfo { let mut rsp: Vec<_> = ci .and_then(|ci| { let mut prune_msg = PruneData { + pubkey: self_id, prunes, signature: Signature::default(), destination: from, source: ci.ncp.clone(), }; - prune_msg.signature = Signature::new( - me.read() - .unwrap() - .keypair - .sign(prune_msg.get_sign_data().as_ref()) - .as_ref(), - ); + prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); to_blob(rsp, ci.ncp).ok() @@ -865,19 +884,19 @@ impl ClusterInfo { match request { // TODO verify messages faster Protocol::PullRequest(filter, caller) => { - if caller.verify_signature() { + if caller.verify() { Self::handle_pull_request(me, filter, caller, from_addr) } else { vec![] } } Protocol::PullResponse(from, mut data) => { - data.retain(|v| v.verify_signature()); + data.retain(|v| v.verify()); Self::handle_pull_response(me, from, data); vec![] } Protocol::PushMessage(from, mut data) => { - data.retain(|v| v.verify_signature()); + data.retain(|v| v.verify()); Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { @@ -1422,13 +1441,13 @@ mod tests { cluster_info.insert_info(peer.clone()); //check that all types of gossip messages are signed correctly let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); - vals.par_iter().for_each(|v| assert!(v.verify_signature())); + vals.par_iter().for_each(|v| assert!(v.verify())); let (_, _, val) = cluster_info .gossip .new_pull_request(timestamp()) .ok() .unwrap(); - assert!(val.verify_signature()); + assert!(val.verify()); // there should be some pushes ready assert!(vals.len() > 0); //using PushMessage since it has the fewest dependencies, but any protocol should work here diff --git a/src/contact_info.rs b/src/contact_info.rs index 801f543b0697b0..f569849f70d9d9 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -1,6 +1,6 @@ use bincode::serialize; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil, Signature}; +use signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -164,23 +164,46 @@ impl ContactInfo { pub fn is_valid_address(addr: &SocketAddr) -> bool { (addr.port() != 0) && Self::is_valid_ip(addr.ip()) } - pub fn get_sign_data(&self) -> Vec { - let mut data = serialize(&self.id).expect("serialize id"); - let ncp = serialize(&self.ncp).expect("serialize ncp"); - data.extend_from_slice(&ncp); - let tvu = serialize(&self.tvu).expect("serialize tvu"); - data.extend_from_slice(&tvu); - let tpu = serialize(&self.tpu).expect("serialize tpu"); - data.extend_from_slice(&tpu); - let storage_addr = serialize(&self.storage_addr).expect("serialize storage_addr"); - data.extend_from_slice(&storage_addr); - let rpc = serialize(&self.rpc).expect("serialize rpc"); - data.extend_from_slice(&rpc); - let rpc_pubsub = serialize(&self.rpc_pubsub).expect("serialize rpc_pubsub"); - data.extend_from_slice(&rpc_pubsub); - let wallclock = serialize(&self.wallclock).expect("serialize wallclock"); - data.extend_from_slice(&wallclock); - data +} + +impl Signable for ContactInfo { + fn pubkey(&self) -> Pubkey { + self.id + } + + fn get_sign_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + id: Pubkey, + ncp: SocketAddr, + tvu: SocketAddr, + tpu: SocketAddr, + storage_addr: SocketAddr, + rpc: SocketAddr, + rpc_pubsub: SocketAddr, + wallclock: u64, + } + + let me = self; + let data = SignData { + id: me.id, + ncp: me.ncp, + tvu: me.tvu, + tpu: me.tpu, + storage_addr: me.storage_addr, + rpc: me.rpc, + rpc_pubsub: me.rpc_pubsub, + wallclock: me.wallclock, + }; + serialize(&data).expect("failed to serialize ContactInfo") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature } } diff --git a/src/crds_value.rs b/src/crds_value.rs index ff36ee457d5133..96d559f552785d 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -1,6 +1,6 @@ use bincode::serialize; use contact_info::ContactInfo; -use signature::{Keypair, Signature}; +use signature::{Keypair, Signable, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; use std::fmt; @@ -33,6 +33,64 @@ pub struct Vote { pub wallclock: u64, } +impl Signable for LeaderId { + fn pubkey(&self) -> Pubkey { + self.id + } + + fn get_sign_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + id: Pubkey, + leader_id: Pubkey, + wallclock: u64, + } + let data = SignData { + id: self.id, + leader_id: self.leader_id, + wallclock: self.wallclock, + }; + serialize(&data).expect("unable to serialize LeaderId") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + +impl Signable for Vote { + fn pubkey(&self) -> Pubkey { + self.transaction.account_keys[0] + } + + fn get_sign_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + transaction: Transaction, + height: u64, + wallclock: u64, + } + let data = SignData { + transaction: self.transaction.clone(), + height: self.height, + wallclock: self.wallclock, + }; + serialize(&data).expect("unable to serialize Vote") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -97,9 +155,11 @@ impl CrdsValue { } pub fn label(&self) -> CrdsValueLabel { match self { - CrdsValue::ContactInfo(contact_info) => CrdsValueLabel::ContactInfo(contact_info.id), - CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.transaction.account_keys[0]), - CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.id), + CrdsValue::ContactInfo(contact_info) => { + CrdsValueLabel::ContactInfo(contact_info.pubkey()) + } + CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()), + CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -128,49 +188,45 @@ impl CrdsValue { CrdsValueLabel::LeaderId(key), ] } +} - pub fn sign(&mut self, keypair: &Keypair) { - let sign_data = self.get_sign_data(); - let signature = Signature::new(&keypair.sign(&sign_data).as_ref()); +impl Signable for CrdsValue { + fn sign(&mut self, keypair: &Keypair) { match self { - CrdsValue::ContactInfo(contact_info) => contact_info.signature = signature, - CrdsValue::Vote(vote) => vote.signature = signature, - CrdsValue::LeaderId(leader_id) => leader_id.signature = signature, + CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair), + CrdsValue::Vote(vote) => vote.sign(keypair), + CrdsValue::LeaderId(leader_id) => leader_id.sign(keypair), + }; + } + fn verify(&self) -> bool { + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.verify(), + CrdsValue::Vote(vote) => vote.verify(), + CrdsValue::LeaderId(leader_id) => leader_id.verify(), } } - pub fn verify_signature(&self) -> bool { - let sig = match self { - CrdsValue::ContactInfo(contact_info) => contact_info.signature, - CrdsValue::Vote(vote) => vote.signature, - CrdsValue::LeaderId(leader_id) => leader_id.signature, - }; - sig.verify(&self.label().pubkey().as_ref(), &self.get_sign_data()) + fn pubkey(&self) -> Pubkey { + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(), + CrdsValue::Vote(vote) => vote.pubkey(), + CrdsValue::LeaderId(leader_id) => leader_id.pubkey(), + } } - /// get all the data relevant to signing contained in this value (excludes signature fields) fn get_sign_data(&self) -> Vec { - let mut data = serialize(&self.wallclock()).expect("serialize wallclock"); + unimplemented!() + } - match self { - CrdsValue::ContactInfo(contact_info) => contact_info.get_sign_data(), - CrdsValue::Vote(vote) => { - let transaction = serialize(&vote.transaction).expect("serialize transaction"); - data.extend_from_slice(&transaction); - let height = serialize(&vote.transaction).expect("serialize height"); - data.extend_from_slice(&height); - data - } - CrdsValue::LeaderId(leader_id) => { - let id = serialize(&leader_id.id).expect("serialize id"); - data.extend_from_slice(&id); - let leader_id = serialize(&leader_id.leader_id).expect("serialize leader_id"); - data.extend_from_slice(&leader_id); - data - } - } + fn get_signature(&self) -> Signature { + unimplemented!() + } + + fn set_signature(&mut self, _: Signature) { + unimplemented!() } } + #[cfg(test)] mod test { use super::*; @@ -216,9 +272,9 @@ mod test { let leader = LeaderId::new(keypair.pubkey(), Pubkey::default(), timestamp()); let mut v = CrdsValue::LeaderId(leader); v.sign(&keypair); - assert!(v.verify_signature()); + assert!(v.verify()); v.sign(&fake_keypair); - assert!(!v.verify_signature()); + assert!(!v.verify()); } } From 4622fa4d090779c4dbf49e15eeb414abd659f328 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 29 Nov 2018 15:40:08 -0800 Subject: [PATCH 09/17] Add a test to check the prune messages --- src/cluster_info.rs | 83 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 7b13698737f43c..fbbb429b106302 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -794,10 +794,9 @@ impl ClusterInfo { .gossip .process_push_message(&data, timestamp()); if !prunes.is_empty() { - let mut wme = me.write().unwrap(); inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); - let ci = wme.lookup(from).cloned(); - let pushes: Vec<_> = wme.new_push_requests(); + let ci = me.read().unwrap().lookup(from).cloned(); + let pushes: Vec<_> = me.write().unwrap().new_push_requests(); inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len()); let mut rsp: Vec<_> = ci .and_then(|ci| { @@ -1432,7 +1431,7 @@ mod tests { let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0); let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); - let mut cluster_info = ClusterInfo::new_with_keypair(node_info, Arc::new(keypair)); + let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair)); let peer_cluster_info = &Arc::new(RwLock::new(ClusterInfo::new_with_keypair( peer.clone(), Arc::new(peer_keypair), @@ -1453,14 +1452,86 @@ mod tests { //using PushMessage since it has the fewest dependencies, but any protocol should work here let resp = ClusterInfo::handle_protocol( &peer_cluster_info, - &peer.ncp, + &node_info.ncp, Protocol::PushMessage(cluster_info.id(), vals), &SharedWindow::default(), &mut None, ); - // there should be no prunes but check anyway + // there should be no prunes assert_eq!(resp.len(), 0); } //TODO test prunes and make sure they can't be forwarded. + #[test] + fn test_prune_signatures() { + //create new cluster info, leader, and peer + let keypair = Keypair::new(); + let peer_keypair = Keypair::new(); + let leader_keypair = Keypair::new(); + let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0); + let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); + let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); + let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair)); + let peer_cluster_info = &Arc::new(RwLock::new(ClusterInfo::new_with_keypair( + peer.clone(), + Arc::new(peer_keypair), + ))); + let mut leader_cluster_info = + ClusterInfo::new_with_keypair(leader.clone(), Arc::new(leader_keypair)); + peer_cluster_info.write().unwrap().set_leader(leader.id); + peer_cluster_info + .write() + .unwrap() + .insert_info(leader.clone()); + cluster_info.set_leader(leader.id); + cluster_info.insert_info(peer.clone()); + cluster_info.insert_info(leader.clone()); + leader_cluster_info.set_leader(leader.id); + leader_cluster_info.insert_info(peer.clone()); + let time = timestamp(); + let (_, _, me_vals) = cluster_info.gossip.new_push_messages(time); + let (_, _, leader_vals) = leader_cluster_info.gossip.new_push_messages(time); + + assert_eq!(me_vals.len(), 2); + assert_eq!(leader_vals.len(), 2); + let mut resp = ClusterInfo::handle_protocol( + &peer_cluster_info, + &node_info.ncp, + Protocol::PushMessage(cluster_info.id(), me_vals.clone()), + &SharedWindow::default(), + &mut None, + ); + // there should be no prunes but check anyway + assert_eq!(resp.len(), 0); + assert!( + peer_cluster_info + .read() + .unwrap() + .lookup(node_info.id) + .is_some() + ); + resp = ClusterInfo::handle_protocol( + &peer_cluster_info, + &leader.ncp, + Protocol::PushMessage(leader_cluster_info.id(), me_vals.clone()), + &SharedWindow::default(), + &mut None, + ); + // there should be a prune + assert_eq!(resp.len(), 1); + //check that the prune is valid + let blob = resp[0].read().unwrap(); + deserialize(&blob.data[..blob.meta.size]) + .iter() + .for_each(|request| match request { + //check that the request was sent to the leader to prune the peer node + Protocol::PruneMessage(from, data) => { + assert!(data.verify()); + assert_eq!(data.source, blob.meta.addr()); + assert_eq!(data.destination, leader_cluster_info.id()); + assert_eq!(data.pubkey, *from); + } + _ => assert!(false), + }); + } } From 1989a30d4c023cc044f5d65bc6bfcdb6aea9cd07 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 29 Nov 2018 16:14:00 -0800 Subject: [PATCH 10/17] Remove verification from pull reqs - Spy nodes should be able to make pull requests --- src/cluster_info.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index fbbb429b106302..fcc639dbe0fc98 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -883,11 +883,8 @@ impl ClusterInfo { match request { // TODO verify messages faster Protocol::PullRequest(filter, caller) => { - if caller.verify() { - Self::handle_pull_request(me, filter, caller, from_addr) - } else { - vec![] - } + //Pulls don't need to be verified + Self::handle_pull_request(me, filter, caller, from_addr) } Protocol::PullResponse(from, mut data) => { data.retain(|v| v.verify()); @@ -899,9 +896,9 @@ impl ClusterInfo { Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { - if data.destination == me.read().unwrap().id() && data.source == *from_addr && data - .signature - .verify(from.as_ref(), data.get_sign_data().as_ref()) + if data.destination == me.read().unwrap().id() + && data.source == *from_addr + && data.verify() { inc_new_counter_info!("cluster_info-prune_message", 1); inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len()); From 75c3a0f37090d4be7065f9057c12d456093c1448 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 30 Nov 2018 12:53:27 -0800 Subject: [PATCH 11/17] Update min bloom filter size and update tests --- src/cluster_info.rs | 98 +++------------------------------------- src/crds_gossip.rs | 49 ++++++++++++++++++-- src/crds_gossip_error.rs | 1 + src/crds_gossip_pull.rs | 6 ++- src/crds_gossip_push.rs | 4 +- 5 files changed, 60 insertions(+), 98 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index fcc639dbe0fc98..cc2243d371003f 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -809,7 +809,6 @@ impl ClusterInfo { }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); - to_blob(rsp, ci.ncp).ok() }).into_iter() .collect(); @@ -896,16 +895,14 @@ impl ClusterInfo { Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { - if data.destination == me.read().unwrap().id() - && data.source == *from_addr - && data.verify() - { + if data.source == *from_addr && data.verify() { inc_new_counter_info!("cluster_info-prune_message", 1); inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len()); me.write() .unwrap() .gossip - .process_prune_msg(from, &data.prunes); + .process_prune_msg(from, data.destination, &data.prunes) + .ok(); } vec![] } @@ -1437,98 +1434,15 @@ mod tests { cluster_info.insert_info(peer.clone()); //check that all types of gossip messages are signed correctly let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); + // there should be some pushes ready + assert!(vals.len() > 0); vals.par_iter().for_each(|v| assert!(v.verify())); + let (_, _, val) = cluster_info .gossip .new_pull_request(timestamp()) .ok() .unwrap(); assert!(val.verify()); - // there should be some pushes ready - assert!(vals.len() > 0); - //using PushMessage since it has the fewest dependencies, but any protocol should work here - let resp = ClusterInfo::handle_protocol( - &peer_cluster_info, - &node_info.ncp, - Protocol::PushMessage(cluster_info.id(), vals), - &SharedWindow::default(), - &mut None, - ); - // there should be no prunes - assert_eq!(resp.len(), 0); - } - - //TODO test prunes and make sure they can't be forwarded. - #[test] - fn test_prune_signatures() { - //create new cluster info, leader, and peer - let keypair = Keypair::new(); - let peer_keypair = Keypair::new(); - let leader_keypair = Keypair::new(); - let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0); - let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); - let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); - let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair)); - let peer_cluster_info = &Arc::new(RwLock::new(ClusterInfo::new_with_keypair( - peer.clone(), - Arc::new(peer_keypair), - ))); - let mut leader_cluster_info = - ClusterInfo::new_with_keypair(leader.clone(), Arc::new(leader_keypair)); - peer_cluster_info.write().unwrap().set_leader(leader.id); - peer_cluster_info - .write() - .unwrap() - .insert_info(leader.clone()); - cluster_info.set_leader(leader.id); - cluster_info.insert_info(peer.clone()); - cluster_info.insert_info(leader.clone()); - leader_cluster_info.set_leader(leader.id); - leader_cluster_info.insert_info(peer.clone()); - let time = timestamp(); - let (_, _, me_vals) = cluster_info.gossip.new_push_messages(time); - let (_, _, leader_vals) = leader_cluster_info.gossip.new_push_messages(time); - - assert_eq!(me_vals.len(), 2); - assert_eq!(leader_vals.len(), 2); - let mut resp = ClusterInfo::handle_protocol( - &peer_cluster_info, - &node_info.ncp, - Protocol::PushMessage(cluster_info.id(), me_vals.clone()), - &SharedWindow::default(), - &mut None, - ); - // there should be no prunes but check anyway - assert_eq!(resp.len(), 0); - assert!( - peer_cluster_info - .read() - .unwrap() - .lookup(node_info.id) - .is_some() - ); - resp = ClusterInfo::handle_protocol( - &peer_cluster_info, - &leader.ncp, - Protocol::PushMessage(leader_cluster_info.id(), me_vals.clone()), - &SharedWindow::default(), - &mut None, - ); - // there should be a prune - assert_eq!(resp.len(), 1); - //check that the prune is valid - let blob = resp[0].read().unwrap(); - deserialize(&blob.data[..blob.meta.size]) - .iter() - .for_each(|request| match request { - //check that the request was sent to the leader to prune the peer node - Protocol::PruneMessage(from, data) => { - assert!(data.verify()); - assert_eq!(data.source, blob.meta.addr()); - assert_eq!(data.destination, leader_cluster_info.id()); - assert_eq!(data.pubkey, *from); - } - _ => assert!(false), - }); } } diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index 57fb21bbe17c71..bb872e36dad950 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -12,6 +12,9 @@ use crds_value::CrdsValue; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +///The min size for bloom filters +pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; + pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, @@ -64,8 +67,18 @@ impl CrdsGossip { } /// add the `from` to the peer's filter of nodes - pub fn process_prune_msg(&mut self, peer: Pubkey, origin: &[Pubkey]) { - self.push.process_prune_msg(peer, origin) + pub fn process_prune_msg( + &mut self, + peer: Pubkey, + destination: Pubkey, + origin: &[Pubkey], + ) -> Result<(), CrdsGossipError> { + if self.id == destination { + self.push.process_prune_msg(peer, origin); + Ok(()) + } else { + Err(CrdsGossipError::BadPruneDestination) + } } /// refresh the push active set @@ -138,11 +151,13 @@ impl CrdsGossip { mod test { use super::*; use bincode::serialized_size; + use cluster_info::NodeInfo; use contact_info::ContactInfo; use crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; use crds_value::CrdsValueLabel; use rayon::prelude::*; use signature::{Keypair, KeypairUtil}; + use solana_sdk::hash::hash; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -317,8 +332,11 @@ mod test { prunes += rsps.len(); network .get(&from) - .map(|node| node.lock().unwrap().process_prune_msg(*to, &rsps)) - .unwrap(); + .map(|node| { + let mut node = node.lock().unwrap(); + let destination = node.id; + node.process_prune_msg(*to, destination, &rsps).unwrap() + }).unwrap(); delivered += rsps.is_empty() as usize; } (bytes, delivered, num_msgs, prunes) @@ -483,4 +501,27 @@ mod test { let mut network = star_network_create(4002); network_simulator(&mut network); } + #[test] + fn test_prune_destination() { + let mut crds_gossip = CrdsGossip::default(); + crds_gossip.id = Pubkey::new(&[0; 32]); + let id = crds_gossip.id; + let ci = NodeInfo::new_localhost(Pubkey::new(&[1; 32]), 0); + let prune_pubkey = Pubkey::new(&[2; 32]); + crds_gossip + .crds + .insert(CrdsValue::ContactInfo(ci.clone()), 0) + .unwrap(); + crds_gossip.refresh_push_active_set(); + //incorrect dest + let mut res = crds_gossip.process_prune_msg( + ci.id, + Pubkey::new(hash(&[1; 32]).as_ref()), + &[prune_pubkey], + ); + assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); + //correct dest + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey]); + assert!(res.is_ok()); + } } diff --git a/src/crds_gossip_error.rs b/src/crds_gossip_error.rs index d9d00ce77c043f..23a5d555e19d21 100644 --- a/src/crds_gossip_error.rs +++ b/src/crds_gossip_error.rs @@ -4,4 +4,5 @@ pub enum CrdsGossipError { PushMessageTimeout, PushMessagePrune, PushMessageOldVersion, + BadPruneDestination, } diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index fc48bd40317351..5f1f1a1e495d89 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -12,6 +12,7 @@ use bincode::serialized_size; use bloom::Bloom; use crds::Crds; +use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; use crds_gossip_error::CrdsGossipError; use crds_value::{CrdsValue, CrdsValueLabel}; use packet::BLOB_DATA_SIZE; @@ -135,7 +136,10 @@ impl CrdsGossipPull { } /// build a filter of the current crds table fn build_crds_filter(&self, crds: &Crds) -> Bloom { - let num = crds.table.values().count() + self.purged_values.len(); + let num = cmp::max( + CRDS_GOSSIP_BLOOM_SIZE, + crds.table.values().count() + self.purged_values.len(), + ); let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); for v in crds.table.values() { bloom.add(&v.value_hash); diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 00f50ba630c3b6..65088aeb0c8e65 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -12,6 +12,7 @@ use bincode::serialized_size; use bloom::Bloom; use contact_info::ContactInfo; use crds::{Crds, VersionedCrdsValue}; +use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; use crds_gossip_error::CrdsGossipError; use crds_value::{CrdsValue, CrdsValueLabel}; use indexmap::map::IndexMap; @@ -183,7 +184,8 @@ impl CrdsGossipPush { continue; } } - let bloom = Bloom::random(network_size, 0.1, 1024 * 8 * 4); + let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); + let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); new_items.insert(val.0.pubkey(), bloom); if new_items.len() == need { break; From f0ea5bb307d3aef412308d2a8552f950fc6cca0b Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 30 Nov 2018 14:15:37 -0800 Subject: [PATCH 12/17] Remove unused --- src/cluster_info.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index cc2243d371003f..0464085e4ca778 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -1426,10 +1426,6 @@ mod tests { let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair)); - let peer_cluster_info = &Arc::new(RwLock::new(ClusterInfo::new_with_keypair( - peer.clone(), - Arc::new(peer_keypair), - ))); cluster_info.set_leader(leader.id); cluster_info.insert_info(peer.clone()); //check that all types of gossip messages are signed correctly From 309bd882e434a3be50ded7b2e976a5036ee13807 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 30 Nov 2018 14:35:49 -0800 Subject: [PATCH 13/17] Fix test to use consistent keypair --- tests/data_replicator.rs | 6 ++++-- tests/multinode.rs | 10 ++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 4a6dd0d809469b..4bd7977ec0e148 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -11,6 +11,7 @@ use solana::ncp::Ncp; use solana::packet::{Blob, SharedBlob}; use solana::result; use solana::service::Service; +use solana::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -19,8 +20,9 @@ use std::thread::sleep; use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { - let mut tn = Node::new_localhost(); - let cluster_info = ClusterInfo::new(tn.info.clone()); + let keypair = Keypair::new(); + let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); + let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); diff --git a/tests/multinode.rs b/tests/multinode.rs index 0b3a51e2dd4cfd..e8217c63614c8f 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -42,12 +42,13 @@ use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { + let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); - let mut spy = Node::new_localhost(); + let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); let me = spy.info.id.clone(); let daddr = "0.0.0.0:0".parse().unwrap(); spy.info.tvu = daddr; - let mut spy_cluster_info = ClusterInfo::new(spy.info); + let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair)); spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); @@ -64,11 +65,12 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { } fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Node, Pubkey) { + let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); - let new_node = Node::new_localhost(); + let new_node = Node::new_localhost_with_pubkey(keypair.pubkey()); let new_node_info = new_node.info.clone(); let me = new_node.info.id.clone(); - let mut new_node_cluster_info = ClusterInfo::new(new_node_info); + let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair)); new_node_cluster_info.insert_info(leader.clone()); new_node_cluster_info.set_leader(leader.id); let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); From 4b85c40304b68f8e52631257967c590515ed349e Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 30 Nov 2018 15:16:23 -0800 Subject: [PATCH 14/17] Fix fmt --- sdk/src/signature.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index 355b877030f27b..bee895f3114e1c 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -33,9 +33,7 @@ impl Signature { pub trait Signable { fn sign(&mut self, keypair: &Keypair) { let data = self.get_sign_data(); - self.set_signature(Signature::new( - &keypair.sign(&data).as_ref(), - )); + self.set_signature(Signature::new(&keypair.sign(&data).as_ref())); } fn verify(&self) -> bool { self.get_signature() From bab48f4e97e380d3524ee929630655a169747206 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 30 Nov 2018 16:00:16 -0800 Subject: [PATCH 15/17] Rename get_sign_data to signable_data --- sdk/src/signature.rs | 6 +++--- src/cluster_info.rs | 4 ++-- src/contact_info.rs | 2 +- src/crds_value.rs | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index bee895f3114e1c..d00cc05f62c82a 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -32,16 +32,16 @@ impl Signature { pub trait Signable { fn sign(&mut self, keypair: &Keypair) { - let data = self.get_sign_data(); + let data = self.signable_data(); self.set_signature(Signature::new(&keypair.sign(&data).as_ref())); } fn verify(&self) -> bool { self.get_signature() - .verify(&self.pubkey().as_ref(), &self.get_sign_data()) + .verify(&self.pubkey().as_ref(), &self.signable_data()) } fn pubkey(&self) -> Pubkey; - fn get_sign_data(&self) -> Vec; + fn signable_data(&self) -> Vec; fn get_signature(&self) -> Signature; fn set_signature(&mut self, signature: Signature); } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 0464085e4ca778..b99d24e7ea94b6 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -83,7 +83,7 @@ impl Signable for PruneData { self.pubkey } - fn get_sign_data(&self) -> Vec { + fn signable_data(&self) -> Vec { #[derive(Serialize)] struct SignData { pubkey: Pubkey, @@ -805,7 +805,7 @@ impl ClusterInfo { prunes, signature: Signature::default(), destination: from, - source: ci.ncp.clone(), + source: ci.ncp, }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); diff --git a/src/contact_info.rs b/src/contact_info.rs index f569849f70d9d9..40aa333f183ec8 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -171,7 +171,7 @@ impl Signable for ContactInfo { self.id } - fn get_sign_data(&self) -> Vec { + fn signable_data(&self) -> Vec { #[derive(Serialize)] struct SignData { id: Pubkey, diff --git a/src/crds_value.rs b/src/crds_value.rs index 96d559f552785d..600933a33cf18e 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -38,7 +38,7 @@ impl Signable for LeaderId { self.id } - fn get_sign_data(&self) -> Vec { + fn signable_data(&self) -> Vec { #[derive(Serialize)] struct SignData { id: Pubkey, @@ -67,7 +67,7 @@ impl Signable for Vote { self.transaction.account_keys[0] } - fn get_sign_data(&self) -> Vec { + fn signable_data(&self) -> Vec { #[derive(Serialize)] struct SignData { transaction: Transaction, @@ -214,7 +214,7 @@ impl Signable for CrdsValue { } } - fn get_sign_data(&self) -> Vec { + fn signable_data(&self) -> Vec { unimplemented!() } From 92d6ea9e9e8e884999b4757b7de0ac50e70a4d14 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 1 Dec 2018 10:49:11 -0800 Subject: [PATCH 16/17] Add wallclock to prune data --- src/cluster_info.rs | 21 +++++++++++++-------- src/crds_gossip.rs | 22 +++++++++++++++++++--- src/crds_gossip_error.rs | 1 + src/crds_gossip_push.rs | 4 ++++ 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/src/cluster_info.rs b/src/cluster_info.rs index b99d24e7ea94b6..f13e402fa2fe31 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -74,8 +74,8 @@ pub struct PruneData { pub signature: Signature, /// The Pubkey of the intended node/destination for this message pub destination: Pubkey, - /// The source Addr this message should have been received from - pub source: SocketAddr, + /// Wallclock of the node that generated this message + pub wallclock: u64, } impl Signable for PruneData { @@ -89,13 +89,13 @@ impl Signable for PruneData { pubkey: Pubkey, prunes: Vec, destination: Pubkey, - source: SocketAddr, + wallclock: u64, } let data = SignData { pubkey: self.pubkey, prunes: self.prunes.clone(), destination: self.destination, - source: self.source, + wallclock: self.wallclock, }; serialize(&data).expect("serialize PruneData") } @@ -805,7 +805,7 @@ impl ClusterInfo { prunes, signature: Signature::default(), destination: from, - source: ci.ncp, + wallclock: timestamp(), }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); @@ -895,14 +895,19 @@ impl ClusterInfo { Self::handle_push_message(me, from, &data) } Protocol::PruneMessage(from, data) => { - if data.source == *from_addr && data.verify() { + if data.verify() { inc_new_counter_info!("cluster_info-prune_message", 1); inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len()); me.write() .unwrap() .gossip - .process_prune_msg(from, data.destination, &data.prunes) - .ok(); + .process_prune_msg( + from, + data.destination, + &data.prunes, + data.wallclock, + timestamp(), + ).ok(); } vec![] } diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index bb872e36dad950..dc331150539fd0 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -72,7 +72,13 @@ impl CrdsGossip { peer: Pubkey, destination: Pubkey, origin: &[Pubkey], + wallclock: u64, + now: u64, ) -> Result<(), CrdsGossipError> { + let expired = now > wallclock + self.push.prune_timeout; + if expired { + return Err(CrdsGossipError::PruneMessageTimeout); + } if self.id == destination { self.push.process_prune_msg(peer, origin); Ok(()) @@ -158,6 +164,7 @@ mod test { use rayon::prelude::*; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::hash; + use solana_sdk::timing::timestamp; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -335,7 +342,9 @@ mod test { .map(|node| { let mut node = node.lock().unwrap(); let destination = node.id; - node.process_prune_msg(*to, destination, &rsps).unwrap() + let now = timestamp(); + node.process_prune_msg(*to, destination, &rsps, now, now) + .unwrap() }).unwrap(); delivered += rsps.is_empty() as usize; } @@ -502,7 +511,7 @@ mod test { network_simulator(&mut network); } #[test] - fn test_prune_destination() { + fn test_prune_errors() { let mut crds_gossip = CrdsGossip::default(); crds_gossip.id = Pubkey::new(&[0; 32]); let id = crds_gossip.id; @@ -513,15 +522,22 @@ mod test { .insert(CrdsValue::ContactInfo(ci.clone()), 0) .unwrap(); crds_gossip.refresh_push_active_set(); + let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( ci.id, Pubkey::new(hash(&[1; 32]).as_ref()), &[prune_pubkey], + now, + now ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest - res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey]); + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now ,now); assert!(res.is_ok()); + //test timeout + let timeout = now + crds_gossip.push.prune_timeout*2; + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now , timeout); + assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); } } diff --git a/src/crds_gossip_error.rs b/src/crds_gossip_error.rs index 23a5d555e19d21..2c3cb517628625 100644 --- a/src/crds_gossip_error.rs +++ b/src/crds_gossip_error.rs @@ -5,4 +5,5 @@ pub enum CrdsGossipError { PushMessagePrune, PushMessageOldVersion, BadPruneDestination, + PruneMessageTimeout, } diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 65088aeb0c8e65..3050fe2d4fa3e3 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -26,6 +26,7 @@ use std::collections::HashMap; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; +pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub struct CrdsGossipPush { /// max bytes per message @@ -38,6 +39,7 @@ pub struct CrdsGossipPush { pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, + pub prune_timeout: u64, } impl Default for CrdsGossipPush { @@ -50,6 +52,8 @@ impl Default for CrdsGossipPush { num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, + prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, + } } } From cc601d76100ee4334697ca34a6abd778795ba88c Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 1 Dec 2018 11:24:31 -0800 Subject: [PATCH 17/17] Fix fmt --- src/crds_gossip.rs | 8 ++++---- src/crds_gossip_push.rs | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index dc331150539fd0..400cf3c61d3b68 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -529,15 +529,15 @@ mod test { Pubkey::new(hash(&[1; 32]).as_ref()), &[prune_pubkey], now, - now + now, ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest - res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now ,now); + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, now); assert!(res.is_ok()); //test timeout - let timeout = now + crds_gossip.push.prune_timeout*2; - res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now , timeout); + let timeout = now + crds_gossip.push.prune_timeout * 2; + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, timeout); assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); } } diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 3050fe2d4fa3e3..078001dfd43c28 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -53,7 +53,6 @@ impl Default for CrdsGossipPush { push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, - } } }